/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
#define __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__

#include <algorithm>
#include <vector>

#include <mesos/resources.hpp>

#include <process/delay.hpp>
#include <process/id.hpp>
#include <process/timeout.hpp>

#include <stout/check.hpp>
#include <stout/duration.hpp>
#include <stout/hashmap.hpp>
#include <stout/stopwatch.hpp>
#include <stout/stringify.hpp>

#include "master/allocator.hpp"
#include "master/drf_sorter.hpp"
#include "master/master.hpp"
#include "master/sorter.hpp"

namespace mesos {
namespace internal {
namespace master {
namespace allocator {

// Forward declarations.
class Filter;


// We forward declare the hierarchical allocator process so that we
// can typedef an instantiation of it with DRF sorters.
template <typename RoleSorter, typename FrameworkSorter>
class HierarchicalAllocatorProcess;

typedef HierarchicalAllocatorProcess<DRFSorter, DRFSorter>
HierarchicalDRFAllocatorProcess;


struct Slave
{
  Slave() {}

  explicit Slave(const SlaveInfo& _info)
    : available(_info.resources()),
      activated(true),
      whitelisted(false),
      checkpoint(_info.checkpoint()),
      info(_info) {}

  Resources resources() const { return info.resources(); }

  std::string hostname() const { return info.hostname(); }

  // Contains all of the resources currently free on this slave.
  Resources available;

  // Whether the slave is activated. Resources are not offered for
  // deactivated slaves until they are reactivated.
  bool activated;

  // Indicates if the resources on this slave should be offered to
  // frameworks.
  bool whitelisted;

  bool checkpoint;
private:
  SlaveInfo info;
};


struct Framework
{
  Framework() {}

  explicit Framework(const FrameworkInfo& _info)
    : checkpoint(_info.checkpoint()),
      info(_info) {}

  std::string role() const { return info.role(); }

  // Filters that have been added by this framework.
  hashset<Filter*> filters;

  bool checkpoint;
private:
  FrameworkInfo info;
};


// Implements the basic allocator algorithm - first pick a role by
// some criteria, then pick one of their frameworks to allocate to.
template <typename RoleSorter, typename FrameworkSorter>
class HierarchicalAllocatorProcess : public AllocatorProcess
{
public:
  HierarchicalAllocatorProcess();

  virtual ~HierarchicalAllocatorProcess();

  process::PID<HierarchicalAllocatorProcess> self();

  void initialize(
      const Flags& flags,
      const process::PID<Master>& _master,
      const hashmap<std::string, RoleInfo>& _roles);

  void frameworkAdded(
      const FrameworkID& frameworkId,
      const FrameworkInfo& frameworkInfo,
      const Resources& used);

  void frameworkRemoved(
      const FrameworkID& frameworkId);

  void frameworkActivated(
      const FrameworkID& frameworkId,
      const FrameworkInfo& frameworkInfo);

  void frameworkDeactivated(
      const FrameworkID& frameworkId);

  void slaveAdded(
      const SlaveID& slaveId,
      const SlaveInfo& slaveInfo,
      const hashmap<FrameworkID, Resources>& used);

  void slaveRemoved(
      const SlaveID& slaveId);

  void slaveDeactivated(
      const SlaveID& slaveId);

  void slaveActivated(
      const SlaveID& slaveId);

  void updateWhitelist(
      const Option<hashset<std::string> >& whitelist);

  void resourcesRequested(
      const FrameworkID& frameworkId,
      const std::vector<Request>& requests);

  void resourcesRecovered(
      const FrameworkID& frameworkId,
      const SlaveID& slaveId,
      const Resources& resources,
      const Option<Filters>& filters);

  void offersRevived(
      const FrameworkID& frameworkId);

protected:
  // Useful typedefs for dispatch/delay/defer to self()/this.
  typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> Self;
  typedef HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> This;

  // Callback for doing batch allocations.
  void batch();

  // Allocate any allocatable resources.
  void allocate();

  // Allocate resources just from the specified slave.
  void allocate(const SlaveID& slaveId);

  // Allocate resources from the specified slaves.
  void allocate(const hashset<SlaveID>& slaveIds);

  // Remove a filter for the specified framework.
  void expire(const FrameworkID& frameworkId, Filter* filter);

  // Checks whether the slave is whitelisted.
  bool isWhitelisted(const SlaveID& slave);

  // Returns true if there is a filter for this framework
  // on this slave.
  bool isFiltered(
      const FrameworkID& frameworkId,
      const SlaveID& slaveId,
      const Resources& resources);

  bool allocatable(const Resources& resources);

  bool initialized;

  Flags flags;
  process::PID<Master> master;

  // Contains all frameworks.
  hashmap<FrameworkID, Framework> frameworks;

  // Maps role names to the Sorter object which contains
  // all of that role's frameworks.
  hashmap<std::string, FrameworkSorter*> sorters;

  // Contains all active slaves.
  hashmap<SlaveID, Slave> slaves;

  hashmap<std::string, RoleInfo> roles;

  // Slaves to send offers for.
  Option<hashset<std::string> > whitelist;

  // Sorter containing all active roles.
  RoleSorter* roleSorter;
};


// Used to represent "filters" for resources unused in offers.
class Filter
{
public:
  virtual ~Filter() {}

  virtual bool filter(const SlaveID& slaveId, const Resources& resources) = 0;
};


class RefusedFilter: public Filter
{
public:
  RefusedFilter(
      const SlaveID& _slaveId,
      const Resources& _resources,
      const process::Timeout& _timeout)
    : slaveId(_slaveId), resources(_resources), timeout(_timeout) {}

  virtual bool filter(const SlaveID& slaveId, const Resources& resources)
  {
    return slaveId == this->slaveId &&
           resources <= this->resources && // Refused resources are superset.
           timeout.remaining() > Seconds(0);
  }

  const SlaveID slaveId;
  const Resources resources;
  const process::Timeout timeout;
};


template <class RoleSorter, class FrameworkSorter>
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::HierarchicalAllocatorProcess() // NOLINT(whitespace/line_length)
  : ProcessBase(process::ID::generate("hierarchical-allocator")),
    initialized(false) {}


template <class RoleSorter, class FrameworkSorter>
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::~HierarchicalAllocatorProcess() // NOLINT(whitespace/line_length)
{}


template <class RoleSorter, class FrameworkSorter>
process::PID<HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> >
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::self()
{
  return process::PID<HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter> >(this); // NOLINT(whitespace/line_length)
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
    const Flags& _flags,
    const process::PID<Master>& _master,
    const hashmap<std::string, RoleInfo>& _roles)
{
  flags = _flags;
  master = _master;
  roles = _roles;
  initialized = true;

  roleSorter = new RoleSorter();
  foreachpair (const std::string& name, const RoleInfo& roleInfo, roles) {
    roleSorter->add(name, roleInfo.weight());
    sorters[name] = new FrameworkSorter();
  }

  VLOG(1) << "Initializing hierarchical allocator process "
          << "with master : " << master;

  delay(flags.allocation_interval, self(), &Self::batch);
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::frameworkAdded(
    const FrameworkID& frameworkId,
    const FrameworkInfo& frameworkInfo,
    const Resources& used)
{
  CHECK(initialized);

  const std::string& role = frameworkInfo.role();

  CHECK(roles.contains(role));

  CHECK(!sorters[role]->contains(frameworkId.value()));
  sorters[role]->add(frameworkId.value());

  // Update the allocation to this framework.
  roleSorter->allocated(role, used);
  sorters[role]->add(used);
  sorters[role]->allocated(frameworkId.value(), used);

  frameworks[frameworkId] = Framework(frameworkInfo);

  LOG(INFO) << "Added framework " << frameworkId;

  allocate();
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::frameworkRemoved(
    const FrameworkID& frameworkId)
{
  CHECK(initialized);

  CHECK(frameworks.contains(frameworkId));
  const std::string& role = frameworks[frameworkId].role();

  // Might not be in 'sorters[role]' because it was previously
  // deactivated and never re-added.
  if (sorters[role]->contains(frameworkId.value())) {
    Resources allocation = sorters[role]->allocation(frameworkId.value());
    roleSorter->unallocated(role, allocation);
    sorters[role]->remove(allocation);
    sorters[role]->remove(frameworkId.value());
  }

  // Do not delete the filters contained in this
  // framework's 'filters' hashset yet, see comments in
  // HierarchicalAllocatorProcess::offersRevived and
  // HierarchicalAllocatorProcess::expire.
  frameworks.erase(frameworkId);

  LOG(INFO) << "Removed framework " << frameworkId;
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::frameworkActivated(
    const FrameworkID& frameworkId,
    const FrameworkInfo& frameworkInfo)
{
  CHECK(initialized);

  const std::string& role = frameworkInfo.role();
  sorters[role]->activate(frameworkId.value());

  LOG(INFO) << "Activated framework " << frameworkId;

  allocate();
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::frameworkDeactivated(
    const FrameworkID& frameworkId)
{
  CHECK(initialized);

  CHECK(frameworks.contains(frameworkId));
  const std::string& role = frameworks[frameworkId].role();

  sorters[role]->deactivate(frameworkId.value());

  // Note that the Sorter *does not* remove the resources allocated
  // to this framework. For now, this is important because if the
  // framework fails over and is activated, we still want a record
  // of the resources that it is using. We might be able to collapse
  // the added/removed and activated/deactivated in the future.

  // Do not delete the filters contained in this
  // framework's 'filters' hashset yet, see comments in
  // HierarchicalAllocatorProcess::offersRevived and
  // HierarchicalAllocatorProcess::expire.
  frameworks[frameworkId].filters.clear();

  LOG(INFO) << "Deactivated framework " << frameworkId;
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveAdded(
    const SlaveID& slaveId,
    const SlaveInfo& slaveInfo,
    const hashmap<FrameworkID, Resources>& used)
{
  CHECK(initialized);

  CHECK(!slaves.contains(slaveId));

  slaves[slaveId] = Slave(slaveInfo);
  slaves[slaveId].whitelisted = isWhitelisted(slaveId);

  roleSorter->add(slaveInfo.resources());

  Resources unused = slaveInfo.resources();

  foreachpair (const FrameworkID& frameworkId,
               const Resources& resources,
               used) {
    if (frameworks.contains(frameworkId)) {
      const std::string& role = frameworks[frameworkId].role();
      sorters[role]->add(resources);
      sorters[role]->allocated(frameworkId.value(), resources);
      roleSorter->allocated(role, resources);
    }

    unused -= resources; // Only want to allocate resources that are not used!
  }

  slaves[slaveId].available = unused;

  LOG(INFO) << "Added slave " << slaveId << " (" << slaveInfo.hostname()
            << ") with " << slaveInfo.resources() << " (and " << unused
            << " available)";

  allocate(slaveId);
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveRemoved(
    const SlaveID& slaveId)
{
  CHECK(initialized);
  CHECK(slaves.contains(slaveId));

  roleSorter->remove(slaves[slaveId].resources());

  slaves.erase(slaveId);

  // Note that we DO NOT actually delete any filters associated with
  // this slave, that will occur when the delayed
  // HierarchicalAllocatorProcess::expire gets invoked (or the framework
  // that applied the filters gets removed).

  LOG(INFO) << "Removed slave " << slaveId;
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveDeactivated(
    const SlaveID& slaveId)
{
  CHECK(initialized);
  CHECK(slaves.contains(slaveId));

  slaves[slaveId].activated = false;

  LOG(INFO) << "Slave " << slaveId << " deactivated";
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::slaveActivated(
    const SlaveID& slaveId)
{
  CHECK(initialized);
  CHECK(slaves.contains(slaveId));

  slaves[slaveId].activated = true;

  LOG(INFO)<< "Slave " << slaveId << " reactivated";
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateWhitelist(
    const Option<hashset<std::string> >& _whitelist)
{
  CHECK(initialized);

  whitelist = _whitelist;

  if (whitelist.isSome()) {
    LOG(INFO) << "Updated slave white list: " << stringify(whitelist.get());

    foreachkey (const SlaveID& slaveId, slaves) {
      slaves[slaveId].whitelisted = isWhitelisted(slaveId);
    }
  }
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesRequested(
    const FrameworkID& frameworkId,
    const std::vector<Request>& requests)
{
  CHECK(initialized);

  LOG(INFO) << "Received resource request from framework " << frameworkId;
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::resourcesRecovered(
    const FrameworkID& frameworkId,
    const SlaveID& slaveId,
    const Resources& resources,
    const Option<Filters>& filters)
{
  CHECK(initialized);

  if (resources.allocatable().size() == 0) {
    return;
  }

  // Updated resources allocated to framework (if framework still
  // exists, which it might not in the event that we dispatched
  // Master::offer before we received AllocatorProcess::frameworkRemoved
  // or AllocatorProcess::frameworkDeactivated, in which case we will
  // have already recovered all of its resources).
  if (frameworks.contains(frameworkId) &&
      sorters[frameworks[frameworkId].role()]->contains(frameworkId.value())) {
    const std::string& role = frameworks[frameworkId].role();
    sorters[role]->unallocated(frameworkId.value(), resources);
    sorters[role]->remove(resources);
    roleSorter->unallocated(role, resources);
  }

  // Update resources allocatable on slave (if slave still exists,
  // which it might not in the event that we dispatched Master::offer
  // before we received Allocator::slaveRemoved).
  if (slaves.contains(slaveId)) {
    slaves[slaveId].available += resources;

    LOG(INFO) << "Recovered " << resources.allocatable()
              << " (total allocatable: " << slaves[slaveId].available
              << ") on slave " << slaveId
              << " from framework " << frameworkId;
  }

  // No need to install the filter if 'filters' is none.
  if (filters.isNone()) {
    return;
  }

  // No need to install the filter if slave/framework does not exist.
  if (!frameworks.contains(frameworkId) || !slaves.contains(slaveId)) {
    return;
  }

  // Create a refused resources filter.
  Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());

  if (seconds.isError()) {
    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
                 << "the refused resources filter because the input value "
                 << "is invalid: " << seconds.error();

    seconds = Duration::create(Filters().refuse_seconds());
  } else if (seconds.get() < Duration::zero()) {
    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
                 << "the refused resources filter because the input value "
                 << "is negative";

    seconds = Duration::create(Filters().refuse_seconds());
  }

  CHECK_SOME(seconds);

  if (seconds.get() != Duration::zero()) {
    VLOG(1) << "Framework " << frameworkId
            << " filtered slave " << slaveId
            << " for " << seconds.get();

    // Create a new filter and delay its expiration.
    Filter* filter = new RefusedFilter(
        slaveId,
        resources,
        process::Timeout::in(seconds.get()));

    frameworks[frameworkId].filters.insert(filter);

    delay(seconds.get(), self(), &Self::expire, frameworkId, filter);
  }
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::offersRevived(
    const FrameworkID& frameworkId)
{
  CHECK(initialized);

  frameworks[frameworkId].filters.clear();

  // We delete each actual Filter when
  // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
  // Filter here it's possible that the same Filter (i.e., same
  // address) could get reused and HierarchicalAllocatorProcess::expire
  // would expire that filter too soon. Note that this only works
  // right now because ALL Filter types "expire".

  LOG(INFO) << "Removed filters for framework " << frameworkId;

  allocate();
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::batch()
{
  CHECK(initialized);
  allocate();
  delay(flags.allocation_interval, self(), &Self::batch);
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate()
{
  CHECK(initialized);

  Stopwatch stopwatch;
  stopwatch.start();

  allocate(slaves.keys());

  VLOG(1) << "Performed allocation for " << slaves.size() << " slaves in "
            << stopwatch.elapsed();
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
    const SlaveID& slaveId)
{
  CHECK(initialized);

  hashset<SlaveID> slaveIds;
  slaveIds.insert(slaveId);

  Stopwatch stopwatch;
  stopwatch.start();

  allocate(slaveIds);

  VLOG(1) << "Performed allocation for slave " << slaveId << " in "
          << stopwatch.elapsed();
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocate(
    const hashset<SlaveID>& slaveIds_)
{
  CHECK(initialized);

  if (roleSorter->count() == 0) {
    VLOG(1) << "No roles to allocate resources!";
    return;
  }

  if (slaveIds_.empty()) {
    VLOG(1) << "No resources available to allocate!";
    return;
  }

  // Randomize the order in which slaves' resources are allocated.
  // TODO(vinod): Implement a smarter sorting algorithm.
  std::vector<SlaveID> slaveIds(slaveIds_.begin(), slaveIds_.end());
  std::random_shuffle(slaveIds.begin(), slaveIds.end());

  hashmap<FrameworkID, hashmap<SlaveID, Resources> > offerable;
  foreach (const SlaveID& slaveId, slaveIds) {
    // If the slave is not activated or whitelisted, ignore it.
    if (!slaves[slaveId].activated || !slaves[slaveId].whitelisted) {
      continue;
    }

    foreach (const std::string& role, roleSorter->sort()) {
      foreach (const std::string& frameworkIdValue, sorters[role]->sort()) {
        FrameworkID frameworkId;
        frameworkId.set_value(frameworkIdValue);

        Resources unreserved = slaves[slaveId].available.extract("*");
        Resources resources = unreserved;
        if (role != "*") {
          resources += slaves[slaveId].available.extract(role);
        }

        // If the resources are not allocatable, ignore.
        if (!allocatable(resources)) {
          continue;
        }

        // If the framework filters these resources, ignore.
        if (isFiltered(frameworkId, slaveId, resources)) {
          continue;
        }

        VLOG(1)
          << "Offering " << resources << " on slave " << slaveId
          << " to framework " << frameworkId;

        offerable[frameworkId][slaveId] = resources;

        // Update slave resources.
        slaves[slaveId].available -= resources;

        // Update the sorters.
        // We only count resources not reserved for this role
        // in the share the sorter considers.
        sorters[role]->add(unreserved);
        sorters[role]->allocated(frameworkIdValue, unreserved);
        roleSorter->allocated(role, unreserved);
      }
    }
  }

  // Now offer the resources to each framework.
  foreachkey (const FrameworkID& frameworkId, offerable) {
    dispatch(master, &Master::offer, frameworkId, offerable[frameworkId]);
  }
}


template <class RoleSorter, class FrameworkSorter>
void
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
    const FrameworkID& frameworkId,
    Filter* filter)
{
  // The filter might have already been removed (e.g., if the
  // framework no longer exists or in
  // HierarchicalAllocatorProcess::offersRevived) but not yet deleted (to
  // keep the address from getting reused possibly causing premature
  // expiration).
  if (frameworks.contains(frameworkId) &&
      frameworks[frameworkId].filters.contains(filter)) {
    frameworks[frameworkId].filters.erase(filter);
  }

  delete filter;
}


template <class RoleSorter, class FrameworkSorter>
bool
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
    const SlaveID& slaveId)
{
  CHECK(initialized);

  CHECK(slaves.contains(slaveId));

  return whitelist.isNone() ||
         whitelist.get().contains(slaves[slaveId].hostname());
}


template <class RoleSorter, class FrameworkSorter>
bool
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
    const FrameworkID& frameworkId,
    const SlaveID& slaveId,
    const Resources& resources)
{
  CHECK(frameworks.contains(frameworkId));
  CHECK(slaves.contains(slaveId));

  // Do not offer a non-checkpointing slave's resources to a checkpointing
  // framework. This is a short term fix until the following is resolved:
  // https://issues.apache.org/jira/browse/MESOS-444.
  if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
    VLOG(1) << "Filtered " << resources
            << " on non-checkpointing slave " << slaveId
            << " for checkpointing framework " << frameworkId;
    return true;
  }

  foreach (Filter* filter, frameworks[frameworkId].filters) {
    if (filter->filter(slaveId, resources)) {
      VLOG(1) << "Filtered " << resources
              << " on slave " << slaveId
              << " for framework " << frameworkId;
      return true;
    }
  }
  return false;
}


template <class RoleSorter, class FrameworkSorter>
bool
HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
    const Resources& resources)
{
  // TODO(benh): For now, only make offers when there is some cpu
  // and memory left. This is an artifact of the original code that
  // only offered when there was at least 1 cpu "unit" available,
  // and without doing this a framework might get offered resources
  // with only memory available (which it obviously will decline)
  // and then end up waiting the default Filters::refuse_seconds
  // (unless the framework set it to something different).

  Option<double> cpus = resources.cpus();
  Option<Bytes> mem = resources.mem();

  if (cpus.isSome() && mem.isSome()) {
    return cpus.get() >= MIN_CPUS && mem.get() > MIN_MEM;
  }

  return false;
}

} // namespace allocator {
} // namespace master {
} // namespace internal {
} // namespace mesos {

#endif // __HIERARCHICAL_ALLOCATOR_PROCESS_HPP__
