// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <mesos/agent/agent.hpp>

#include <mesos/master/master.hpp>

#include <mesos/v1/agent/agent.hpp>

#include <mesos/v1/master/master.hpp>

#include <process/pid.hpp>

#include <stout/check.hpp>
#include <stout/json.hpp>
#include <stout/protobuf.hpp>

#include "internal/evolve.hpp"

#include "master/constants.hpp"

using std::string;

using google::protobuf::RepeatedPtrField;

using process::UPID;

namespace mesos {
namespace internal {

// Helper for evolving a type by serializing/parsing when the types
// have not changed across versions.
template <typename T>
static T evolve(const google::protobuf::Message& message)
{
  T t;

  string data;

  // NOTE: We need to use 'SerializePartialToString' instead of
  // 'SerializeToString' because some required fields might not be set
  // and we don't want an exception to get thrown.
  CHECK(message.SerializePartialToString(&data))
    << "Failed to serialize " << message.GetTypeName()
    << " while evolving to " << t.GetTypeName();

  // NOTE: We need to use 'ParsePartialFromString' instead of
  // 'ParseFromString' because some required fields might not
  // be set and we don't want an exception to get thrown.
  CHECK(t.ParsePartialFromString(data))
    << "Failed to parse " << t.GetTypeName()
    << " while evolving from " << message.GetTypeName();

  return t;
}


v1::AgentID evolve(const SlaveID& slaveId)
{
  // NOTE: Not using 'evolve<SlaveID, v1::AgentID>(slaveId)' since
  // this will be a common 'evolve' call and we wanted to speed up
  // performance.

  v1::AgentID id;
  id.set_value(slaveId.value());
  return id;
}


v1::AgentInfo evolve(const SlaveInfo& slaveInfo)
{
  return evolve<v1::AgentInfo>(slaveInfo);
}


v1::ContainerInfo evolve(const ContainerInfo& containerInfo)
{
  return evolve<v1::ContainerInfo>(containerInfo);
}


v1::DomainInfo evolve(const DomainInfo& domainInfo)
{
  return evolve<v1::DomainInfo>(domainInfo);
}


v1::DrainInfo evolve(const DrainInfo& drainInfo)
{
  return evolve<v1::DrainInfo>(drainInfo);
}


v1::ExecutorID evolve(const ExecutorID& executorId)
{
  return evolve<v1::ExecutorID>(executorId);
}


v1::ExecutorInfo evolve(const ExecutorInfo& executorInfo)
{
  return evolve<v1::ExecutorInfo>(executorInfo);
}


v1::FileInfo evolve(const FileInfo& fileInfo)
{
  return evolve<v1::FileInfo>(fileInfo);
}


v1::FrameworkID evolve(const FrameworkID& frameworkId)
{
  return evolve<v1::FrameworkID>(frameworkId);
}


v1::FrameworkInfo evolve(const FrameworkInfo& frameworkInfo)
{
  return evolve<v1::FrameworkInfo>(frameworkInfo);
}


v1::InverseOffer evolve(const InverseOffer& inverseOffer)
{
  return evolve<v1::InverseOffer>(inverseOffer);
}


v1::KillPolicy evolve(const KillPolicy& killPolicy)
{
  return evolve<v1::KillPolicy>(killPolicy);
}


v1::MachineID evolve(const MachineID& machineId)
{
  return evolve<v1::MachineID>(machineId);
}


v1::MasterInfo evolve(const MasterInfo& masterInfo)
{
  return evolve<v1::MasterInfo>(masterInfo);
}


v1::Offer evolve(const Offer& offer)
{
  return evolve<v1::Offer>(offer);
}


v1::OfferID evolve(const OfferID& offerId)
{
  return evolve<v1::OfferID>(offerId);
}


v1::OperationStatus evolve(const OperationStatus& status)
{
  v1::OperationStatus _status = evolve<v1::OperationStatus>(status);

  if (status.has_slave_id()) {
    *_status.mutable_agent_id() = evolve<v1::AgentID>(status.slave_id());
  }

  return _status;
}


v1::Resource evolve(const Resource& resource)
{
  return evolve<v1::Resource>(resource);
}


v1::ResourceProviderID evolve(
    const ResourceProviderID& resourceProviderId)
{
  // NOTE: We do not use the common 'devolve' call for performance.
  v1::ResourceProviderID id;
  id.set_value(resourceProviderId.value());
  return id;
}


v1::Resources evolve(const Resources& resources)
{
  return evolve<v1::Resource>(
      static_cast<const RepeatedPtrField<Resource>&>(resources));
}


v1::Task evolve(const Task& task)
{
  return evolve<v1::Task>(task);
}


v1::TaskID evolve(const TaskID& taskId)
{
  return evolve<v1::TaskID>(taskId);
}


v1::TaskInfo evolve(const TaskInfo& taskInfo)
{
  return evolve<v1::TaskInfo>(taskInfo);
}


v1::TaskStatus evolve(const TaskStatus& status)
{
  return evolve<v1::TaskStatus>(status);
}


v1::UUID evolve(const UUID& uuid)
{
  return evolve<v1::UUID>(uuid);
}


v1::agent::Call evolve(const agent::Call& call)
{
  return evolve<v1::agent::Call>(call);
}


v1::agent::ProcessIO evolve(const agent::ProcessIO& processIO)
{
  return evolve<v1::agent::ProcessIO>(processIO);
}


v1::agent::Response evolve(const agent::Response& response)
{
  return evolve<v1::agent::Response>(response);
}


v1::maintenance::ClusterStatus evolve(const maintenance::ClusterStatus& status)
{
  return evolve<v1::maintenance::ClusterStatus>(status);
}


v1::maintenance::Schedule evolve(const maintenance::Schedule& schedule)
{
  return evolve<v1::maintenance::Schedule>(schedule);
}


v1::master::Response evolve(const mesos::master::Response& response)
{
  return evolve<v1::master::Response>(response);
}


v1::resource_provider::Call evolve(const resource_provider::Call& call)
{
  return evolve<v1::resource_provider::Call>(call);
}


v1::resource_provider::Event evolve(const resource_provider::Event& event)
{
  return evolve<v1::resource_provider::Event>(event);
}


// TODO(xujyan): Do we need this conversion when Mesos never sends out
// `scheduler::Call` thus never needs to evovle internal call to a v1 call?
// Perhaps we should remove the method so there's no need to maintain it.
v1::scheduler::Call evolve(const scheduler::Call& call)
{
  v1::scheduler::Call _call = evolve<v1::scheduler::Call>(call);

  // Certain conversions require special handling.
  if (_call.type() == v1::scheduler::Call::SUBSCRIBE) {
    // v1 Subscribe.suppressed_roles cannot be automatically converted
    // because its tag is used by another field in the internal Subscribe.
    *(_call.mutable_subscribe()->mutable_suppressed_roles()) =
      call.subscribe().suppressed_roles();
  }

  return _call;
}


v1::scheduler::Event evolve(const scheduler::Event& event)
{
  return evolve<v1::scheduler::Event>(event);
}


v1::scheduler::Event evolve(const ExitedExecutorMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::FAILURE);

  v1::scheduler::Event::Failure* failure = event.mutable_failure();
  *failure->mutable_agent_id() = evolve(message.slave_id());
  *failure->mutable_executor_id() = evolve(message.executor_id());
  failure->set_status(message.status());

  return event;
}


v1::scheduler::Event evolve(const ExecutorToFrameworkMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::MESSAGE);

  v1::scheduler::Event::Message* message_ = event.mutable_message();
  *message_->mutable_agent_id() = evolve(message.slave_id());
  *message_->mutable_executor_id() = evolve(message.executor_id());
  message_->set_data(message.data());

  return event;
}


v1::scheduler::Event evolve(const FrameworkErrorMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::ERROR);

  v1::scheduler::Event::Error* error = event.mutable_error();
  error->set_message(message.message());

  return event;
}


v1::scheduler::Event evolve(const FrameworkRegisteredMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::SUBSCRIBED);

  v1::scheduler::Event::Subscribed* subscribed = event.mutable_subscribed();
  *subscribed->mutable_framework_id() = evolve(message.framework_id());

  // TODO(anand): The master should pass the heartbeat interval as an argument
  // to `evolve()`.
  subscribed->set_heartbeat_interval_seconds(
      master::DEFAULT_HEARTBEAT_INTERVAL.secs());

  *subscribed->mutable_master_info() = evolve(message.master_info());

  return event;
}


v1::scheduler::Event evolve(const FrameworkReregisteredMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::SUBSCRIBED);

  v1::scheduler::Event::Subscribed* subscribed = event.mutable_subscribed();
  *subscribed->mutable_framework_id() = evolve(message.framework_id());

  // TODO(anand): The master should pass the heartbeat interval as an argument
  // to `evolve()`.
  subscribed->set_heartbeat_interval_seconds(
      master::DEFAULT_HEARTBEAT_INTERVAL.secs());

  *subscribed->mutable_master_info() = evolve(message.master_info());

  return event;
}


v1::scheduler::Event evolve(const InverseOffersMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::INVERSE_OFFERS);

  v1::scheduler::Event::InverseOffers* inverse_offers =
    event.mutable_inverse_offers();

  *inverse_offers->mutable_inverse_offers() =
    evolve<v1::InverseOffer>(message.inverse_offers());

  return event;
}


v1::scheduler::Event evolve(const LostSlaveMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::FAILURE);

  v1::scheduler::Event::Failure* failure = event.mutable_failure();
  *failure->mutable_agent_id() = evolve(message.slave_id());

  return event;
}


v1::scheduler::Event evolve(const ResourceOffersMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::OFFERS);

  v1::scheduler::Event::Offers* offers = event.mutable_offers();
  *offers->mutable_offers() = evolve<v1::Offer>(message.offers());

  return event;
}


v1::scheduler::Event evolve(const RescindInverseOfferMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::RESCIND_INVERSE_OFFER);

  v1::scheduler::Event::RescindInverseOffer* rescindInverseOffer =
    event.mutable_rescind_inverse_offer();

  *rescindInverseOffer->mutable_inverse_offer_id() =
      evolve(message.inverse_offer_id());

  return event;
}


v1::scheduler::Event evolve(const RescindResourceOfferMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::RESCIND);

  v1::scheduler::Event::Rescind* rescind = event.mutable_rescind();

  *rescind->mutable_offer_id() = evolve(message.offer_id());

  return event;
}


v1::scheduler::Event evolve(const StatusUpdateMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::UPDATE);

  v1::scheduler::Event::Update* update = event.mutable_update();

  *update->mutable_status() = evolve(message.update().status());

  if (message.update().has_slave_id()) {
    *update->mutable_status()->mutable_agent_id() =
      evolve(message.update().slave_id());
  }

  if (message.update().has_executor_id()) {
    *update->mutable_status()->mutable_executor_id() =
      evolve(message.update().executor_id());
  }

  update->mutable_status()->set_timestamp(message.update().timestamp());

  // If the update does not have a 'uuid', it does not need
  // acknowledging. However, prior to 0.23.0, the update uuid
  // was required and always set. In 0.24.0, we can rely on the
  // update uuid check here, until then we must still check for
  // this being sent from the driver (from == UPID()) or from
  // the master (pid == UPID()).
  // TODO(vinod): Get rid of this logic in 0.25.0 because master
  // and slave correctly set task status in 0.24.0.
  if (!message.update().has_uuid() || message.update().uuid() == "") {
    update->mutable_status()->clear_uuid();
  } else if (UPID(message.pid()) == UPID()) {
    update->mutable_status()->clear_uuid();
  } else {
    update->mutable_status()->set_uuid(message.update().uuid());
  }

  return event;
}


v1::scheduler::Event evolve(const UpdateOperationStatusMessage& message)
{
  v1::scheduler::Event event;
  event.set_type(v1::scheduler::Event::UPDATE_OPERATION_STATUS);
  *event.mutable_update_operation_status()->mutable_status() =
    evolve(message.status());

  return event;
}


v1::executor::Call evolve(const executor::Call& call)
{
  return evolve<v1::executor::Call>(call);
}


v1::executor::Event evolve(const executor::Event& event)
{
  return evolve<v1::executor::Event>(event);
}


v1::scheduler::Response evolve(const scheduler::Response& response)
{
  return evolve<v1::scheduler::Response>(response);
}


v1::executor::Event evolve(const ExecutorRegisteredMessage& message)
{
  v1::executor::Event event;
  event.set_type(v1::executor::Event::SUBSCRIBED);

  v1::executor::Event::Subscribed* subscribed = event.mutable_subscribed();

  *subscribed->mutable_executor_info() = evolve(message.executor_info());
  *subscribed->mutable_framework_info() = evolve(message.framework_info());
  *subscribed->mutable_agent_info() = evolve(message.slave_info());

  return event;
}


v1::executor::Event evolve(const FrameworkToExecutorMessage& message)
{
  v1::executor::Event event;
  event.set_type(v1::executor::Event::MESSAGE);

  v1::executor::Event::Message* message_ = event.mutable_message();

  message_->set_data(message.data());

  return event;
}


v1::executor::Event evolve(const KillTaskMessage& message)
{
  v1::executor::Event event;
  event.set_type(v1::executor::Event::KILL);

  v1::executor::Event::Kill* kill = event.mutable_kill();

  *kill->mutable_task_id() = evolve(message.task_id());

  if (message.has_kill_policy()) {
    *kill->mutable_kill_policy() = evolve(message.kill_policy());
  }

  return event;
}


v1::executor::Event evolve(const RunTaskMessage& message)
{
  v1::executor::Event event;
  event.set_type(v1::executor::Event::LAUNCH);

  v1::executor::Event::Launch* launch = event.mutable_launch();

  *launch->mutable_task() = evolve(message.task());

  return event;
}


v1::executor::Event evolve(const ShutdownExecutorMessage&)
{
  v1::executor::Event event;
  event.set_type(v1::executor::Event::SHUTDOWN);

  return event;
}


v1::executor::Event evolve(
    const StatusUpdateAcknowledgementMessage& message)
{
  v1::executor::Event event;
  event.set_type(v1::executor::Event::ACKNOWLEDGED);

  v1::executor::Event::Acknowledged* acknowledged =
    event.mutable_acknowledged();

  *acknowledged->mutable_task_id() = evolve(message.task_id());
  acknowledged->set_uuid(message.uuid());

  return event;
}


v1::master::Event evolve(const mesos::master::Event& event)
{
  return evolve<v1::master::Event>(event);
}


template<>
v1::master::Response evolve<v1::master::Response::GET_FLAGS>(
    const JSON::Object& object)
{
  v1::master::Response response;
  response.set_type(v1::master::Response::GET_FLAGS);

  v1::master::Response::GetFlags* getFlags = response.mutable_get_flags();

  Result<JSON::Object> flags = object.at<JSON::Object>("flags");
  CHECK_SOME(flags) << "Failed to find 'flags' key in the JSON object";

  foreachpair (const string& key,
               const JSON::Value& value,
               flags->values) {
    v1::Flag* flag = getFlags->add_flags();
    flag->set_name(key);

    CHECK(value.is<JSON::String>())
      << "Flag '" + key + "' value is not a string";

    flag->set_value(value.as<JSON::String>().value);
  }

  return response;
}


// TODO(vinod): Consolidate master and agent flags evolution.
template<>
v1::agent::Response evolve<v1::agent::Response::GET_FLAGS>(
    const JSON::Object& object)
{
  v1::agent::Response response;
  response.set_type(v1::agent::Response::GET_FLAGS);

  v1::agent::Response::GetFlags* getFlags = response.mutable_get_flags();

  Result<JSON::Object> flags = object.at<JSON::Object>("flags");
  CHECK_SOME(flags) << "Failed to find 'flags' key in the JSON object";

  foreachpair (const string& key,
               const JSON::Value& value,
               flags->values) {
    v1::Flag* flag = getFlags->add_flags();
    flag->set_name(key);

    CHECK(value.is<JSON::String>())
      << "Flag '" + key + "' value is not a string";

    flag->set_value(value.as<JSON::String>().value);
  }

  return response;
}


template<>
v1::master::Response evolve<v1::master::Response::GET_VERSION>(
    const JSON::Object& object)
{
  v1::master::Response response;
  response.set_type(v1::master::Response::GET_VERSION);

  *response.mutable_get_version()->mutable_version_info() =
    CHECK_NOTERROR(::protobuf::parse<v1::VersionInfo>(object));

  return response;
}


// TODO(vinod): Consolidate master and agent version evolution.
template<>
v1::agent::Response evolve<v1::agent::Response::GET_VERSION>(
    const JSON::Object& object)
{
  v1::agent::Response response;
  response.set_type(v1::agent::Response::GET_VERSION);

  *response.mutable_get_version()->mutable_version_info() =
    CHECK_NOTERROR(::protobuf::parse<v1::VersionInfo>(object));

  return response;
}


template<>
v1::agent::Response evolve<v1::agent::Response::GET_CONTAINERS>(
    const JSON::Array& array)
{
  v1::agent::Response response;
  response.set_type(v1::agent::Response::GET_CONTAINERS);

  foreach (const JSON::Value& value, array.values) {
    v1::agent::Response::GetContainers::Container* container =
      response.mutable_get_containers()->add_containers();

    JSON::Object object = value.as<JSON::Object>();

    Result<JSON::String> container_id =
      object.find<JSON::String>("container_id");

    CHECK_SOME(container_id);
    container->mutable_container_id()->set_value(container_id->value);

    Result<JSON::String> framework_id =
      object.find<JSON::String>("framework_id");

    CHECK(!framework_id.isError());
    if (framework_id.isSome()) {
      container->mutable_framework_id()->set_value(framework_id->value);
    }

    Result<JSON::String> executor_id = object.find<JSON::String>("executor_id");

    CHECK(!executor_id.isError());
    if (executor_id.isSome()) {
      container->mutable_executor_id()->set_value(executor_id->value);
    }

    Result<JSON::String> executor_name =
      object.find<JSON::String>("executor_name");

    CHECK(!executor_name.isError());
    if (executor_name.isSome()) {
      container->set_executor_name(executor_name->value);
    }

    Result<JSON::Object> container_status = object.find<JSON::Object>("status");
    if (container_status.isSome()) {
      *container->mutable_container_status() = CHECK_NOTERROR(
          ::protobuf::parse<v1::ContainerStatus>(container_status.get()));
    }

    Result<JSON::Object> resource_statistics =
      object.find<JSON::Object>("statistics");

    if (resource_statistics.isSome()) {
      *container->mutable_resource_statistics() = CHECK_NOTERROR(
          ::protobuf::parse<v1::ResourceStatistics>(resource_statistics.get()));
    }
  }

  return response;
}

} // namespace internal {
} // namespace mesos {
