/*
 * Copyright 2015 Twitter, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "util/tuple-cache.h"
#include <iostream>
#include <map>
#include <string>
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"

namespace heron {
namespace stmgr {

TupleCache::TupleCache(EventLoop* eventLoop, sp_uint32 _drain_threshold)
    : eventLoop_(eventLoop), drain_threshold_bytes_(_drain_threshold) {
  cache_drain_frequency_ms_ =
      config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrCacheDrainFrequencyMs();
  tuples_cache_max_tuple_size_ =
      config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrPacketMaximumSizeBytes();

  total_size_ = 0;
  auto drain_cb = [this](EventLoop::Status status) { this->drain(status); };
  eventLoop_->registerTimer(std::move(drain_cb), true, cache_drain_frequency_ms_ * 1000);
}

TupleCache::~TupleCache() {
  // Drain the cache first
  drain_impl();

  for (auto iter = cache_.begin(); iter != cache_.end(); ++iter) {
    delete iter->second;
  }
}

sp_int64 TupleCache::add_data_tuple(sp_int32 _task_id, const proto::api::StreamId& _streamid,
                                    proto::system::HeronDataTuple* _tuple) {
  if (total_size_ >= drain_threshold_bytes_) drain_impl();
  TupleList* l = get(_task_id);
  return l->add_data_tuple(_streamid, _tuple, &total_size_, &tuples_cache_max_tuple_size_);
}

void TupleCache::add_ack_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
  if (total_size_ >= drain_threshold_bytes_) drain_impl();
  TupleList* l = get(_task_id);
  return l->add_ack_tuple(_tuple, &total_size_);
}

void TupleCache::add_fail_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
  if (total_size_ >= drain_threshold_bytes_) drain_impl();
  TupleList* l = get(_task_id);
  return l->add_fail_tuple(_tuple, &total_size_);
}

void TupleCache::add_emit_tuple(sp_int32 _task_id, const proto::system::AckTuple& _tuple) {
  if (total_size_ >= drain_threshold_bytes_) drain_impl();
  TupleList* l = get(_task_id);
  return l->add_emit_tuple(_tuple, &total_size_);
}

TupleCache::TupleList* TupleCache::get(sp_int32 _task_id) {
  TupleList* l = NULL;
  auto iter = cache_.find(_task_id);
  if (iter == cache_.end()) {
    l = new TupleList();
    cache_[_task_id] = l;
  } else {
    l = iter->second;
  }
  return l;
}

void TupleCache::drain(EventLoop::Status) { drain_impl(); }

void TupleCache::drain_impl() {
  for (auto iter = cache_.begin(); iter != cache_.end(); ++iter) {
    iter->second->drain(iter->first, drainer_);
  }
  total_size_ = 0;
}

TupleCache::TupleList::TupleList() {
  current_ = NULL;
  current_size_ = 0;
  last_drained_count_ = 0;
}

TupleCache::TupleList::~TupleList() {
  CHECK(tuples_.empty());
}

sp_int64 TupleCache::TupleList::add_data_tuple(const proto::api::StreamId& _streamid,
                                               proto::system::HeronDataTuple* _tuple,
                                               sp_uint64* _total_size,
                                               sp_uint64* _tuples_cache_max_tuple_size) {
  if (!current_ || current_->has_control() || current_->data().stream().id() != _streamid.id() ||
      current_->data().stream().component_name() != _streamid.component_name() ||
      current_size_ > *_tuples_cache_max_tuple_size) {
    if (current_) {
      tuples_.push_front(current_);
    }
    current_ = acquire_clean_set();
    current_->mutable_data()->mutable_stream()->MergeFrom(_streamid);
    current_size_ = 0;
  }

  sp_int64 tuple_key = 0;
  if (_tuple->roots_size() > 0) {
     tuple_key = RandUtils::lrand();
  }
  // Override in place
  _tuple->set_key(tuple_key);

  std::string* added_tuple = current_->mutable_data()->add_tuples();
  _tuple->SerializePartialToString(added_tuple);

  sp_int64 tuple_size = _tuple->GetCachedSize();
  current_size_ += tuple_size;
  *_total_size += tuple_size;
  return tuple_key;
}

void TupleCache::TupleList::add_ack_tuple(const proto::system::AckTuple& _tuple,
                                          sp_uint64* _total_size) {
  if (!current_ || current_->has_data() || current_->control().emits_size() > 0) {
    if (current_) {
      tuples_.push_front(current_);
    }
    current_ = acquire_clean_set();
    current_size_ = 0;
  }
  sp_int64 tuple_size = _tuple.ByteSize();
  current_size_ += tuple_size;
  *_total_size += tuple_size;
  current_->mutable_control()->add_acks()->CopyFrom(_tuple);
}

void TupleCache::TupleList::add_fail_tuple(const proto::system::AckTuple& _tuple,
                                           sp_uint64* _total_size) {
  if (!current_ || current_->has_data() || current_->control().emits_size() > 0) {
    if (current_) {
      tuples_.push_front(current_);
    }
    current_ = acquire_clean_set();
    current_size_ = 0;
  }
  sp_int64 tuple_size = _tuple.ByteSize();
  current_size_ += tuple_size;
  *_total_size += tuple_size;
  current_->mutable_control()->add_fails()->CopyFrom(_tuple);
}

void TupleCache::TupleList::add_emit_tuple(const proto::system::AckTuple& _tuple,
                                           sp_uint64* _total_size) {
  if (!current_ || current_->has_data() || current_->control().acks_size() > 0 ||
      current_->control().fails_size() > 0) {
    if (current_) {
      tuples_.push_front(current_);
    }
    current_ = acquire_clean_set();
    current_size_ = 0;
  }
  sp_int64 tuple_size = _tuple.ByteSize();
  current_size_ += tuple_size;
  *_total_size += tuple_size;
  current_->mutable_control()->add_emits()->CopyFrom(_tuple);
}

void TupleCache::TupleList::drain(
    sp_int32 _task_id, std::function<void(sp_int32, proto::system::HeronTupleSet2*)> _drainer) {
  sp_int32 drained = 0;
  // we have to drain from back
  while (!tuples_.empty()) {
    _drainer(_task_id, tuples_.back());  // Drain cleans up the structure
    tuples_.pop_back();
    drained++;
  }
  if (current_ && drained == 0 && last_drained_count_ == 0) {
    // We didn;t drain anything last time. Better do it now
    // TODO(vikasr) : Add metric
    _drainer(_task_id, current_);
    drained++;
    current_ = NULL;
    current_size_ = 0;
  }
  last_drained_count_ = drained;
}
}  // namespace stmgr
}  // namespace heron
