/**
 * @file TailEventLog.cpp
 * TailEventLog class implementation
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "TailEventLog.h"
#include <vector>
#include <queue>
#include <map>
#include <set>
#include <sstream>
#include <stdio.h>
#include <string>
#include <iostream>

#include "io/DataStream.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"


namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {

const std::string TailEventLog::ProcessorName("TailEventLog");
core::Relationship TailEventLog::Success("success", "All files, containing log events, are routed to success");
core::Property TailEventLog::LogSourceFileName("Log Source", "Log Source from which to read events", "");
core::Property TailEventLog::MaxEventsPerFlowFile("Max Events Per FlowFile", "Events per flow file", "1");
void TailEventLog::initialize() {
  //! Set the supported properties
  std::set<core::Property> properties;
  properties.insert(LogSourceFileName);
  properties.insert(MaxEventsPerFlowFile);
  setSupportedProperties(properties);
  //! Set the supported relationships
  std::set<core::Relationship> relationships;
  relationships.insert(Success);
  setSupportedRelationships(relationships);
}

void TailEventLog::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
  std::string value;

  if (context->getProperty(LogSourceFileName.getName(), value)) {
    log_source_ = value;
  }
  if (context->getProperty(MaxEventsPerFlowFile.getName(), value)) {
    core::Property::StringToInt(value, max_events_);
  }

  log_handle_ = OpenEventLog(NULL, log_source_.c_str());

  logger_->log_trace("TailEventLog configured to tail %s",log_source_);

}

void TailEventLog::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  
  if (log_handle_ == nullptr) {
    logger_->log_debug("Handle could not be created for %s", log_source_);
  }

  BYTE buffer[MAX_RECORD_BUFFER_SIZE];

  EVENTLOGRECORD *event_record = (EVENTLOGRECORD*)&buffer;

  DWORD bytes_to_read = 0, min_bytes = 0;
  
  GetOldestEventLogRecord(log_handle_, &current_record_);
  GetNumberOfEventLogRecords(log_handle_, &num_records_);
  current_record_ = num_records_-max_events_;
  
  logger_->log_trace("%d and %d", current_record_, num_records_);

  if (ReadEventLog(log_handle_,EVENTLOG_FORWARDS_READ | EVENTLOG_SEEK_READ, current_record_, event_record,MAX_RECORD_BUFFER_SIZE, &bytes_to_read,&min_bytes))
  {
    if (bytes_to_read == 0) {
      logger_->log_debug("Yielding");
      context->yield();
    }
    while (bytes_to_read > 0)
    {

      std::shared_ptr<FlowFileRecord> flowFile = std::static_pointer_cast<FlowFileRecord>(session->create());
      if (flowFile == nullptr)
        return;

      LPSTR source =
        (LPSTR)((LPBYTE)event_record + sizeof(EVENTLOGRECORD));

      LPSTR computer_name =
        (LPSTR)((LPBYTE)event_record + sizeof(EVENTLOGRECORD) +
          strlen(source) + 1);

      flowFile->addAttribute("source", source);
      flowFile->addAttribute("record_number", std::to_string( event_record->RecordNumber));
      flowFile->addAttribute("computer_name", computer_name);
      
      flowFile->addAttribute("event_time", getTimeStamp(event_record->TimeGenerated));
      flowFile->addAttribute("event_type", typeToString(event_record->EventType));
      //flowFile->addAttribute("", event_message);

      
      io::DataStream stream((const uint8_t*)(event_record + event_record->DataOffset), event_record->DataLength);
      // need an import from the data stream.
      session->importFrom(stream, flowFile);
      session->transfer(flowFile, Success);
      bytes_to_read -= event_record->Length;
      event_record = (EVENTLOGRECORD *)
        ((LPBYTE)event_record + event_record->Length);
    }

    event_record = (EVENTLOGRECORD *)&buffer;
    logger_->log_trace("All done no more");
  }
  else {
    LogWindowsError();
    logger_->log_trace("Yielding due to error");
    context->yield();
  }

}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
