/**
 * @file TailEventLog.cpp
 * TailEventLog class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "TailEventLog.h"
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sstream>
#include <stdio.h>
#include <string>
#include <iostream>

#include "io/DataStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"


namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {

const std::string TailEventLog::ProcessorName("TailEventLog");
core::Relationship TailEventLog::Success("success", "All files, containing log events, are routed to success");
core::Property TailEventLog::LogSourceFileName("Log Source", "Log Source from which to read events", "");
core::Property TailEventLog::MaxEventsPerFlowFile("Max Events Per FlowFile", "Events per flow file", "1");
void TailEventLog::initialize() {
  //! Set the supported properties
  std::set<core::Property> properties;
  properties.insert(LogSourceFileName);
  properties.insert(MaxEventsPerFlowFile);
  setSupportedProperties(properties);
  //! Set the supported relationships
  std::set<core::Relationship> relationships;
  relationships.insert(Success);
  setSupportedRelationships(relationships);
}

void TailEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
  std::string value;

  if (context->getProperty(LogSourceFileName.getName(), value)) {
    log_source_ = value;
  }
  if (context->getProperty(MaxEventsPerFlowFile.getName(), value)) {
    core::Property::StringToInt(value, max_events_);
  }

  log_handle_ = OpenEventLog(NULL, log_source_.c_str());

  logger_->log_trace("TailEventLog configured to tail %s",log_source_);

}

void TailEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  
	if (log_handle_ == nullptr) {
		logger_->log_debug("Handle could not be created for %s", log_source_);
	}

	BYTE buffer[MAX_RECORD_BUFFER_SIZE];

	EVENTLOGRECORD *event_record = (EVENTLOGRECORD*)&buffer;

	DWORD bytes_to_read = 0, min_bytes = 0;
	
	GetOldestEventLogRecord(log_handle_, &current_record_);
	GetNumberOfEventLogRecords(log_handle_, &num_records_);
	current_record_ = num_records_-max_events_;
	
	logger_->log_trace("%d and %d", current_record_, num_records_);

	if (ReadEventLog(log_handle_,EVENTLOG_FORWARDS_READ | EVENTLOG_SEEK_READ, current_record_, event_record,MAX_RECORD_BUFFER_SIZE, &bytes_to_read,&min_bytes))
	{
		if (bytes_to_read == 0) {
			logger_->log_debug("Yielding");
			context->yield();
		}
		while (bytes_to_read > 0)
		{

			std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
			if (flowFile == nullptr)
				return;

			LPSTR source =
				(LPSTR)((LPBYTE)event_record + sizeof(EVENTLOGRECORD));

			LPSTR computer_name =
				(LPSTR)((LPBYTE)event_record + sizeof(EVENTLOGRECORD) +
					strlen(source) + 1);

			flowFile->addAttribute("source", source);
			flowFile->addAttribute("record_number", std::to_string( event_record->RecordNumber));
			flowFile->addAttribute("computer_name", computer_name);
			
			flowFile->addAttribute("event_time", getTimeStamp(event_record->TimeGenerated));
			flowFile->addAttribute("event_type", typeToString(event_record->EventType));
			//flowFile->addAttribute("", event_message);

			
			io::DataStream stream((const uint8_t*)(event_record + event_record->DataOffset), event_record->DataLength);
			// need an import from the data stream.
			session->importFrom(stream, flowFile);
			session->transfer(flowFile, Success);
			bytes_to_read -= event_record->Length;
			event_record = (EVENTLOGRECORD *)
				((LPBYTE)event_record + event_record->Length);
		}

		event_record = (EVENTLOGRECORD *)&buffer;
		logger_->log_trace("All done no more");
	}
	else {
		LogWindowsError();
		logger_->log_trace("Yielding due to error");
		context->yield();
	}

}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */