| /** |
| * @file Provenance.h |
| * Flow file record class declaration |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #ifndef __PROVENANCE_H__ |
| #define __PROVENANCE_H__ |
| |
| #include <stdio.h> |
| #include <errno.h> |
| #include <unistd.h> |
| #include <sys/types.h> |
| #include <sys/stat.h> |
| #include <uuid/uuid.h> |
| #include <vector> |
| #include <queue> |
| #include <map> |
| #include <mutex> |
| #include <atomic> |
| #include <set> |
| #include <cassert> |
| #include <errno.h> |
| #include <chrono> |
| #include <thread> |
| #include <ftw.h> |
| #include "leveldb/db.h" |
| |
| #include "TimeUtil.h" |
| #include "Logger.h" |
| #include "Configure.h" |
| #include "Property.h" |
| #include "ResourceClaim.h" |
| #include "Relationship.h" |
| #include "Connection.h" |
| #include "FlowFileRecord.h" |
| |
| // Provenance Event Record Serialization Seg Size |
| #define PROVENANCE_EVENT_RECORD_SEG_SIZE 2048 |
| |
| class ProvenanceRepository; |
| |
| //! Provenance Event Record |
| class ProvenanceEventRecord |
| { |
| public: |
| enum ProvenanceEventType { |
| |
| /** |
| * A CREATE event is used when a FlowFile is generated from data that was |
| * not received from a remote system or external process |
| */ |
| CREATE, |
| |
| /** |
| * Indicates a provenance event for receiving data from an external process. This Event Type |
| * is expected to be the first event for a FlowFile. As such, a Processor that receives data |
| * from an external source and uses that data to replace the content of an existing FlowFile |
| * should use the {@link #FETCH} event type, rather than the RECEIVE event type. |
| */ |
| RECEIVE, |
| |
| /** |
| * Indicates that the contents of a FlowFile were overwritten using the contents of some |
| * external resource. This is similar to the {@link #RECEIVE} event but varies in that |
| * RECEIVE events are intended to be used as the event that introduces the FlowFile into |
| * the system, whereas FETCH is used to indicate that the contents of an existing FlowFile |
| * were overwritten. |
| */ |
| FETCH, |
| |
| /** |
| * Indicates a provenance event for sending data to an external process |
| */ |
| SEND, |
| |
| /** |
| * Indicates that the contents of a FlowFile were downloaded by a user or external entity. |
| */ |
| DOWNLOAD, |
| |
| /** |
| * Indicates a provenance event for the conclusion of an object's life for |
| * some reason other than object expiration |
| */ |
| DROP, |
| |
| /** |
| * Indicates a provenance event for the conclusion of an object's life due |
| * to the fact that the object could not be processed in a timely manner |
| */ |
| EXPIRE, |
| |
| /** |
| * FORK is used to indicate that one or more FlowFile was derived from a |
| * parent FlowFile. |
| */ |
| FORK, |
| |
| /** |
| * JOIN is used to indicate that a single FlowFile is derived from joining |
| * together multiple parent FlowFiles. |
| */ |
| JOIN, |
| |
| /** |
| * CLONE is used to indicate that a FlowFile is an exact duplicate of its |
| * parent FlowFile. |
| */ |
| CLONE, |
| |
| /** |
| * CONTENT_MODIFIED is used to indicate that a FlowFile's content was |
| * modified in some way. When using this Event Type, it is advisable to |
| * provide details about how the content is modified. |
| */ |
| CONTENT_MODIFIED, |
| |
| /** |
| * ATTRIBUTES_MODIFIED is used to indicate that a FlowFile's attributes were |
| * modified in some way. This event is not needed when another event is |
| * reported at the same time, as the other event will already contain all |
| * FlowFile attributes. |
| */ |
| ATTRIBUTES_MODIFIED, |
| |
| /** |
| * ROUTE is used to show that a FlowFile was routed to a specified |
| * {@link org.apache.nifi.processor.Relationship Relationship} and should provide |
| * information about why the FlowFile was routed to this relationship. |
| */ |
| ROUTE, |
| |
| /** |
| * Indicates a provenance event for adding additional information such as a |
| * new linkage to a new URI or UUID |
| */ |
| ADDINFO, |
| |
| /** |
| * Indicates a provenance event for replaying a FlowFile. The UUID of the |
| * event will indicate the UUID of the original FlowFile that is being |
| * replayed. The event will contain exactly one Parent UUID that is also the |
| * UUID of the FlowFile that is being replayed and exactly one Child UUID |
| * that is the UUID of the a newly created FlowFile that will be re-queued |
| * for processing. |
| */ |
| REPLAY |
| }; |
| friend class ProcessSession; |
| public: |
| //! Constructor |
| /*! |
| * Create a new provenance event record |
| */ |
| ProvenanceEventRecord(ProvenanceEventType event, std::string componentId, std::string componentType) { |
| _eventType = event; |
| _componentId = componentId; |
| _componentType = componentType; |
| _eventTime = getTimeMillis(); |
| char eventIdStr[37]; |
| // Generate the global UUID for th event |
| uuid_generate(_eventId); |
| uuid_unparse(_eventId, eventIdStr); |
| _eventIdStr = eventIdStr; |
| _serializedBuf = NULL; |
| _serializeBufSize = 0; |
| _maxSerializeBufSize = 0; |
| _logger = Logger::getLogger(); |
| } |
| |
| ProvenanceEventRecord() { |
| _eventTime = getTimeMillis(); |
| _serializedBuf = NULL; |
| _serializeBufSize = 0; |
| _maxSerializeBufSize = 0; |
| _logger = Logger::getLogger(); |
| } |
| |
| //! Destructor |
| virtual ~ProvenanceEventRecord() { |
| } |
| //! Get the Event ID |
| std::string getEventId() { |
| return _eventIdStr; |
| } |
| //! Get Attributes |
| std::map<std::string, std::string> getAttributes() { |
| return _attributes; |
| } |
| //! Get Size |
| uint64_t getFileSize() { |
| return _size; |
| } |
| // ! Get Offset |
| uint64_t getFileOffset() { |
| return _offset; |
| } |
| // ! Get Entry Date |
| uint64_t getFlowFileEntryDate() { |
| return _entryDate; |
| } |
| // ! Get Lineage Start Date |
| uint64_t getlineageStartDate() { |
| return _lineageStartDate; |
| } |
| // ! Get Event Time |
| uint64_t getEventTime() { |
| return _eventTime; |
| } |
| // ! Get Event Duration |
| uint64_t getEventDuration() { |
| return _eventDuration; |
| } |
| //! Set Event Duration |
| void setEventDuration(uint64_t duration) |
| { |
| _eventDuration = duration; |
| } |
| // ! Get Event Type |
| ProvenanceEventType getEventType() { |
| return _eventType; |
| } |
| //! Get Component ID |
| std::string getComponentId() |
| { |
| return _componentId; |
| } |
| //! Get Component Type |
| std::string getComponentType() |
| { |
| return _componentType; |
| } |
| //! Get FlowFileUuid |
| std::string getFlowFileUuid() |
| { |
| return _uuid; |
| } |
| //! Get content full path |
| std::string getContentFullPath() |
| { |
| return _contentFullPath; |
| } |
| //! Get LineageIdentifiers |
| std::set<std::string> getLineageIdentifiers() |
| { |
| return _lineageIdentifiers; |
| } |
| //! Get Details |
| std::string getDetails() |
| { |
| return _details; |
| } |
| //! Set Details |
| void setDetails(std::string details) |
| { |
| _details = details; |
| } |
| //! Get TransitUri |
| std::string getTransitUri() |
| { |
| return _transitUri; |
| } |
| //! Set TransitUri |
| void setTransitUri(std::string uri) |
| { |
| _transitUri = uri; |
| } |
| //! Get SourceSystemFlowFileIdentifier |
| std::string getSourceSystemFlowFileIdentifier() |
| { |
| return _sourceSystemFlowFileIdentifier; |
| } |
| //! Set SourceSystemFlowFileIdentifier |
| void setSourceSystemFlowFileIdentifier(std::string identifier) |
| { |
| _sourceSystemFlowFileIdentifier = identifier; |
| } |
| //! Get Parent UUIDs |
| std::vector<std::string> getParentUuids() |
| { |
| return _parentUuids; |
| } |
| //! Add Parent UUID |
| void addParentUuid(std::string uuid) |
| { |
| if (std::find(_parentUuids.begin(), _parentUuids.end(), uuid) != _parentUuids.end()) |
| return; |
| else |
| _parentUuids.push_back(uuid); |
| } |
| //! Add Parent Flow File |
| void addParentFlowFile(FlowFileRecord *flow) |
| { |
| addParentUuid(flow->getUUIDStr()); |
| return; |
| } |
| //! Remove Parent UUID |
| void removeParentUuid(std::string uuid) |
| { |
| _parentUuids.erase(std::remove(_parentUuids.begin(), _parentUuids.end(), uuid), _parentUuids.end()); |
| } |
| //! Remove Parent Flow File |
| void removeParentFlowFile(FlowFileRecord *flow) |
| { |
| removeParentUuid(flow->getUUIDStr()); |
| return; |
| } |
| //! Get Children UUIDs |
| std::vector<std::string> getChildrenUuids() |
| { |
| return _childrenUuids; |
| } |
| //! Add Child UUID |
| void addChildUuid(std::string uuid) |
| { |
| if (std::find(_childrenUuids.begin(), _childrenUuids.end(), uuid) != _childrenUuids.end()) |
| return; |
| else |
| _childrenUuids.push_back(uuid); |
| } |
| //! Add Child Flow File |
| void addChildFlowFile(FlowFileRecord *flow) |
| { |
| addChildUuid(flow->getUUIDStr()); |
| return; |
| } |
| //! Remove Child UUID |
| void removeChildUuid(std::string uuid) |
| { |
| _childrenUuids.erase(std::remove(_childrenUuids.begin(), _childrenUuids.end(), uuid), _childrenUuids.end()); |
| } |
| //! Remove Child Flow File |
| void removeChildFlowFile(FlowFileRecord *flow) |
| { |
| removeChildUuid(flow->getUUIDStr()); |
| return; |
| } |
| //! Get AlternateIdentifierUri |
| std::string getAlternateIdentifierUri() |
| { |
| return _alternateIdentifierUri; |
| } |
| //! Set AlternateIdentifierUri |
| void setAlternateIdentifierUri(std::string uri) |
| { |
| _alternateIdentifierUri = uri; |
| } |
| //! Get Relationship |
| std::string getRelationship() |
| { |
| return _relationship; |
| } |
| //! Set Relationship |
| void setRelationship(std::string relation) |
| { |
| _relationship = relation; |
| } |
| //! Get sourceQueueIdentifier |
| std::string getSourceQueueIdentifier() |
| { |
| return _sourceQueueIdentifier; |
| } |
| //! Set sourceQueueIdentifier |
| void setSourceQueueIdentifier(std::string identifier) |
| { |
| _sourceQueueIdentifier = identifier; |
| } |
| //! fromFlowFile |
| void fromFlowFile(FlowFileRecord *flow) |
| { |
| _entryDate = flow->getEntryDate(); |
| _lineageStartDate = flow->getlineageStartDate(); |
| _lineageIdentifiers = flow->getlineageIdentifiers(); |
| _uuid = flow->getUUIDStr(); |
| _attributes = flow->getAttributes(); |
| _size = flow->getSize(); |
| _offset = flow->getOffset(); |
| if (flow->getOriginalConnection()) |
| _sourceQueueIdentifier = flow->getOriginalConnection()->getName(); |
| if (flow->getResourceClaim()) |
| { |
| _contentFullPath = flow->getResourceClaim()->getContentFullPath(); |
| } |
| } |
| //! Serialize and Persistent to the repository |
| bool Serialize(ProvenanceRepository *repo); |
| //! DeSerialize |
| bool DeSerialize(uint8_t *buffer, int bufferSize); |
| //! DeSerialize |
| bool DeSerialize(ProvenanceRepository *repo, std::string key); |
| |
| protected: |
| |
| //! Event type |
| ProvenanceEventType _eventType; |
| //! Date at which the event was created |
| uint64_t _eventTime; |
| //! Date at which the flow file entered the flow |
| uint64_t _entryDate; |
| //! Date at which the origin of this flow file entered the flow |
| uint64_t _lineageStartDate; |
| //! Event Duration |
| uint64_t _eventDuration; |
| //! Component ID |
| std::string _componentId; |
| //! Component Type |
| std::string _componentType; |
| //! Size in bytes of the data corresponding to this flow file |
| uint64_t _size; |
| //! flow uuid |
| std::string _uuid; |
| //! Offset to the content |
| uint64_t _offset; |
| //! Full path to the content |
| std::string _contentFullPath; |
| //! Attributes key/values pairs for the flow record |
| std::map<std::string, std::string> _attributes; |
| //! provenance ID |
| uuid_t _eventId; |
| //! UUID string for all parents |
| std::set<std::string> _lineageIdentifiers; |
| //! transitUri |
| std::string _transitUri; |
| //! sourceSystemFlowFileIdentifier |
| std::string _sourceSystemFlowFileIdentifier; |
| //! parent UUID |
| std::vector<std::string> _parentUuids; |
| //! child UUID |
| std::vector<std::string> _childrenUuids; |
| //! detail |
| std::string _details; |
| //! sourceQueueIdentifier |
| std::string _sourceQueueIdentifier; |
| //! event ID Str |
| std::string _eventIdStr; |
| //! relationship |
| std::string _relationship; |
| //! alternateIdentifierUri; |
| std::string _alternateIdentifierUri; |
| |
| private: |
| |
| //! Logger |
| Logger *_logger; |
| // All serialization related method and internal buf |
| uint8_t *_serializedBuf; |
| int _serializeBufSize; |
| int _maxSerializeBufSize; |
| int writeData(uint8_t *value, int size) |
| { |
| if ((_serializeBufSize + size) > _maxSerializeBufSize) |
| { |
| // if write exceed |
| uint8_t *buffer = new uint8_t[_maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE]; |
| if (!buffer) |
| { |
| return -1; |
| } |
| memcpy(buffer, _serializedBuf, _serializeBufSize); |
| delete[] _serializedBuf; |
| _serializedBuf = buffer; |
| _maxSerializeBufSize = _maxSerializeBufSize + PROVENANCE_EVENT_RECORD_SEG_SIZE; |
| } |
| uint8_t *bufPtr = _serializedBuf + _serializeBufSize; |
| memcpy(bufPtr, value, size); |
| _serializeBufSize += size; |
| return size; |
| } |
| int readData(uint8_t *buf, int buflen) |
| { |
| if ((buflen + _serializeBufSize) > _maxSerializeBufSize) |
| { |
| // if read exceed |
| return -1; |
| } |
| uint8_t *bufPtr = _serializedBuf + _serializeBufSize; |
| memcpy(buf, bufPtr, buflen); |
| _serializeBufSize += buflen; |
| return buflen; |
| } |
| int write(uint8_t value) |
| { |
| return writeData(&value, 1); |
| } |
| int write(char value) |
| { |
| return writeData((uint8_t *)&value, 1); |
| } |
| int write(uint32_t value) |
| { |
| uint8_t temp[4]; |
| |
| temp[0] = (value & 0xFF000000) >> 24; |
| temp[1] = (value & 0x00FF0000) >> 16; |
| temp[2] = (value & 0x0000FF00) >> 8; |
| temp[3] = (value & 0x000000FF); |
| return writeData(temp, 4); |
| } |
| int write(uint16_t value) |
| { |
| uint8_t temp[2]; |
| temp[0] = (value & 0xFF00) >> 8; |
| temp[1] = (value & 0xFF); |
| return writeData(temp, 2); |
| } |
| int write(uint8_t *value, int len) |
| { |
| return writeData(value, len); |
| } |
| int write(uint64_t value) |
| { |
| uint8_t temp[8]; |
| |
| temp[0] = (value >> 56) & 0xFF; |
| temp[1] = (value >> 48) & 0xFF; |
| temp[2] = (value >> 40) & 0xFF; |
| temp[3] = (value >> 32) & 0xFF; |
| temp[4] = (value >> 24) & 0xFF; |
| temp[5] = (value >> 16) & 0xFF; |
| temp[6] = (value >> 8) & 0xFF; |
| temp[7] = (value >> 0) & 0xFF; |
| return writeData(temp, 8); |
| } |
| int write(bool value) |
| { |
| uint8_t temp = value; |
| return write(temp); |
| } |
| int writeUTF(std::string str, bool widen = false); |
| int read(uint8_t &value) |
| { |
| uint8_t buf; |
| |
| int ret = readData(&buf, 1); |
| if (ret == 1) |
| value = buf; |
| return ret; |
| } |
| int read(uint16_t &value) |
| { |
| uint8_t buf[2]; |
| |
| int ret = readData(buf, 2); |
| if (ret == 2) |
| value = (buf[0] << 8) | buf[1]; |
| return ret; |
| } |
| int read(char &value) |
| { |
| uint8_t buf; |
| |
| int ret = readData(&buf, 1); |
| if (ret == 1) |
| value = (char) buf; |
| return ret; |
| } |
| int read(uint8_t *value, int len) |
| { |
| return readData(value, len); |
| } |
| int read(uint32_t &value) |
| { |
| uint8_t buf[4]; |
| |
| int ret = readData(buf, 4); |
| if (ret == 4) |
| value = (buf[0] << 24) | (buf[1] << 16) | (buf[2] << 8) | buf[3]; |
| return ret; |
| } |
| int read(uint64_t &value) |
| { |
| uint8_t buf[8]; |
| |
| int ret = readData(buf, 8); |
| if (ret == 8) |
| { |
| value = ((uint64_t) buf[0] << 56) | |
| ((uint64_t) (buf[1] & 255) << 48) | |
| ((uint64_t) (buf[2] & 255) << 40) | |
| ((uint64_t) (buf[3] & 255) << 32) | |
| ((uint64_t) (buf[4] & 255) << 24) | |
| ((uint64_t) (buf[5] & 255) << 16) | |
| ((uint64_t) (buf[6] & 255) << 8) | |
| ((uint64_t) (buf[7] & 255) << 0); |
| } |
| return ret; |
| } |
| int readUTF(std::string &str, bool widen = false); |
| |
| // Prevent default copy constructor and assignment operation |
| // Only support pass by reference or pointer |
| ProvenanceEventRecord(const ProvenanceEventRecord &parent); |
| ProvenanceEventRecord &operator=(const ProvenanceEventRecord &parent); |
| |
| }; |
| |
| //! Provenance Reporter |
| class ProvenanceReporter |
| { |
| friend class ProcessSession; |
| public: |
| //! Constructor |
| /*! |
| * Create a new provenance reporter associated with the process session |
| */ |
| ProvenanceReporter(std::string componentId, std::string componentType) { |
| _logger = Logger::getLogger(); |
| _componentId = componentId; |
| _componentType = componentType; |
| } |
| |
| //! Destructor |
| virtual ~ProvenanceReporter() { |
| clear(); |
| } |
| //! Get events |
| std::set<ProvenanceEventRecord *> getEvents() |
| { |
| return _events; |
| } |
| //! Add event |
| void add(ProvenanceEventRecord *event) |
| { |
| _events.insert(event); |
| } |
| //! Remove event |
| void remove(ProvenanceEventRecord *event) |
| { |
| if (_events.find(event) != _events.end()) |
| { |
| _events.erase(event); |
| } |
| } |
| //! |
| //! clear |
| void clear() |
| { |
| for (std::set<ProvenanceEventRecord*>::iterator it = _events.begin(); it != _events.end(); ++it) |
| { |
| ProvenanceEventRecord *event = (ProvenanceEventRecord *) (*it); |
| delete event; |
| } |
| _events.clear(); |
| } |
| //! allocate |
| ProvenanceEventRecord *allocate(ProvenanceEventRecord::ProvenanceEventType eventType, FlowFileRecord *flow) |
| { |
| ProvenanceEventRecord *event = new ProvenanceEventRecord(eventType, _componentId, _componentType); |
| if (event) |
| event->fromFlowFile(flow); |
| |
| return event; |
| } |
| //! commit |
| void commit(); |
| //! create |
| void create(FlowFileRecord *flow, std::string detail); |
| //! route |
| void route(FlowFileRecord *flow, Relationship relation, std::string detail, uint64_t processingDuration); |
| //! modifyAttributes |
| void modifyAttributes(FlowFileRecord *flow, std::string detail); |
| //! modifyContent |
| void modifyContent(FlowFileRecord *flow, std::string detail, uint64_t processingDuration); |
| //! clone |
| void clone(FlowFileRecord *parent, FlowFileRecord *child); |
| //! join |
| void join(std::vector<FlowFileRecord *> parents, FlowFileRecord *child, std::string detail, uint64_t processingDuration); |
| //! fork |
| void fork(std::vector<FlowFileRecord *> child, FlowFileRecord *parent, std::string detail, uint64_t processingDuration); |
| //! expire |
| void expire(FlowFileRecord *flow, std::string detail); |
| //! drop |
| void drop(FlowFileRecord *flow, std::string reason); |
| //! send |
| void send(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration, bool force); |
| //! fetch |
| void fetch(FlowFileRecord *flow, std::string transitUri, std::string detail, uint64_t processingDuration); |
| //! receive |
| void receive(FlowFileRecord *flow, std::string transitUri, std::string sourceSystemFlowFileIdentifier, std::string detail, uint64_t processingDuration); |
| |
| protected: |
| |
| //! Component ID |
| std::string _componentId; |
| //! Component Type |
| std::string _componentType; |
| |
| private: |
| |
| //! Incoming connection Iterator |
| std::set<ProvenanceEventRecord *> _events; |
| //! Logger |
| Logger *_logger; |
| |
| // Prevent default copy constructor and assignment operation |
| // Only support pass by reference or pointer |
| ProvenanceReporter(const ProvenanceReporter &parent); |
| ProvenanceReporter &operator=(const ProvenanceReporter &parent); |
| }; |
| |
| #define PROVENANCE_DIRECTORY "./provenance_repository" |
| #define MAX_PROVENANCE_STORAGE_SIZE (10*1024*1024) // 10M |
| #define MAX_PROVENANCE_ENTRY_LIFE_TIME (60000) // 1 minute |
| #define PROVENANCE_PURGE_PERIOD (2500) // 2500 msec |
| |
| //! Provenance Repository |
| class ProvenanceRepository |
| { |
| public: |
| //! Constructor |
| /*! |
| * Create a new provenance repository |
| */ |
| ProvenanceRepository() { |
| _logger = Logger::getLogger(); |
| _configure = Configure::getConfigure(); |
| _directory = PROVENANCE_DIRECTORY; |
| _maxPartitionMillis = MAX_PROVENANCE_ENTRY_LIFE_TIME; |
| _purgePeriod = PROVENANCE_PURGE_PERIOD; |
| _maxPartitionBytes = MAX_PROVENANCE_STORAGE_SIZE; |
| _db = NULL; |
| _running = false; |
| _repoFull = false; |
| } |
| |
| //! Destructor |
| virtual ~ProvenanceRepository() { |
| stop(); |
| if (this->_thread) |
| delete this->_thread; |
| destroy(); |
| } |
| |
| //! initialize |
| bool initialize() |
| { |
| std::string value; |
| if (_configure->get(Configure::nifi_provenance_repository_directory_default, value)) |
| { |
| _directory = value; |
| } |
| _logger->log_info("NiFi Provenance Repository Directory %s", _directory.c_str()); |
| if (_configure->get(Configure::nifi_provenance_repository_max_storage_size, value)) |
| { |
| Property::StringToInt(value, _maxPartitionBytes); |
| } |
| _logger->log_info("NiFi Provenance Max Partition Bytes %d", _maxPartitionBytes); |
| if (_configure->get(Configure::nifi_provenance_repository_max_storage_time, value)) |
| { |
| TimeUnit unit; |
| if (Property::StringToTime(value, _maxPartitionMillis, unit) && |
| Property::ConvertTimeUnitToMS(_maxPartitionMillis, unit, _maxPartitionMillis)) |
| { |
| } |
| } |
| _logger->log_info("NiFi Provenance Max Storage Time: [%d] ms", _maxPartitionMillis); |
| leveldb::Options options; |
| options.create_if_missing = true; |
| leveldb::Status status = leveldb::DB::Open(options, _directory.c_str(), &_db); |
| if (status.ok()) |
| { |
| _logger->log_info("NiFi Provenance Repository database open %s success", _directory.c_str()); |
| } |
| else |
| { |
| _logger->log_error("NiFi Provenance Repository database open %s fail", _directory.c_str()); |
| return false; |
| } |
| |
| // start the monitor thread |
| start(); |
| return true; |
| } |
| //! Put |
| bool Put(std::string key, uint8_t *buf, int bufLen) |
| { |
| // persistent to the DB |
| leveldb::Slice value((const char *) buf, bufLen); |
| leveldb::Status status; |
| status = _db->Put(leveldb::WriteOptions(), key, value); |
| if (status.ok()) |
| return true; |
| else |
| return false; |
| } |
| //! Delete |
| bool Delete(std::string key) |
| { |
| leveldb::Status status; |
| status = _db->Delete(leveldb::WriteOptions(), key); |
| if (status.ok()) |
| return true; |
| else |
| return false; |
| } |
| //! Get |
| bool Get(std::string key, std::string &value) |
| { |
| leveldb::Status status; |
| status = _db->Get(leveldb::ReadOptions(), key, &value); |
| if (status.ok()) |
| return true; |
| else |
| return false; |
| } |
| //! Persistent event |
| void registerEvent(ProvenanceEventRecord *event) |
| { |
| event->Serialize(this); |
| } |
| //! Remove event |
| void removeEvent(ProvenanceEventRecord *event) |
| { |
| Delete(event->getEventId()); |
| } |
| //! destroy |
| void destroy() |
| { |
| if (_db) |
| { |
| delete _db; |
| _db = NULL; |
| } |
| } |
| //! Run function for the thread |
| static void run(ProvenanceRepository *repo); |
| //! Start the repository monitor thread |
| void start(); |
| //! Stop the repository monitor thread |
| void stop(); |
| //! whether the repo is full |
| bool isFull() |
| { |
| return _repoFull; |
| } |
| |
| protected: |
| |
| private: |
| |
| //! Mutex for protection |
| std::mutex _mtx; |
| //! repository directory |
| std::string _directory; |
| //! Logger |
| Logger *_logger; |
| //! Configure |
| Configure *_configure; |
| //! max db entry life time |
| int64_t _maxPartitionMillis; |
| //! max db size |
| int64_t _maxPartitionBytes; |
| //! purge period |
| uint64_t _purgePeriod; |
| //! level DB database |
| leveldb::DB* _db; |
| //! thread |
| std::thread *_thread; |
| //! whether it is running |
| bool _running; |
| //! whether stop accepting provenace event |
| std::atomic<bool> _repoFull; |
| //! size of the directory |
| static uint64_t _repoSize; |
| //! call back for directory size |
| static int repoSum(const char *fpath, const struct stat *sb, int typeflag) |
| { |
| _repoSize += sb->st_size; |
| return 0; |
| } |
| //! repoSize |
| uint64_t repoSize() |
| { |
| _repoSize = 0; |
| if (ftw(_directory.c_str(), repoSum, 1) != 0) |
| _repoSize = 0; |
| |
| return _repoSize; |
| } |
| // Prevent default copy constructor and assignment operation |
| // Only support pass by reference or pointer |
| ProvenanceRepository(const ProvenanceRepository &parent); |
| ProvenanceRepository &operator=(const ProvenanceRepository &parent); |
| }; |
| |
| |
| |
| #endif |