/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "fwklib/TimeSync.hpp"
#include "fwklib/FwkLog.hpp"
#include "fwklib/PerfFwk.hpp"
#include "fwklib/PaceMeter.hpp"

#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <memory.h>
#include <errno.h>
#include <sys/types.h>

#include "config.h"

#ifndef WIN32
#include <unistd.h>
#include <fcntl.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <arpa/inet.h>
#endif

namespace apache {
namespace geode {
namespace client {
namespace testframework {

// Our configuration parameters
#define TIME_SYNC_ADDR "224.0.37.37"
#define TIME_SYNC_PORT 9631
#define TIME_SYNC_TTL 3

// ----------------------------------------------------------------------------

static const short int wrd = 0x0001;
static const char* byt = reinterpret_cast<const char*>(&wrd);

#define isNetworkOrder() (byt[1] == 1)

#define NSWAP_8(x) ((x)&0xff)
#define NSWAP_16(x) ((NSWAP_8(x) << 8) | NSWAP_8((x) >> 8))
#define NSWAP_32(x) ((NSWAP_16(x) << 16) | NSWAP_16((x) >> 16))
#define NSWAP_64(x) ((NSWAP_32(x) << 32) | NSWAP_32((x) >> 32))

long long htonl64(long long value) {
  if (isNetworkOrder()) return value;
  return NSWAP_64(value);
}

long long ntohl64(long long value) {
  if (isNetworkOrder()) return value;
  return NSWAP_64(value);
}

// ----------------------------------------------------------------------------

int32_t TimeSync::svc() {
  if (!m_delta) {
    sendTimeSync();
  } else {
    recvTimeSync();
  }
  return 0;
}

// ----------------------------------------------------------------------------

void TimeSync::sendTimeSync() {
  int32_t sock = (int32_t)socket(AF_INET, SOCK_DGRAM, 0);
  if (sock < 0) {
    FWKSEVERE("Failed to create socket for sending TimeSync messages.  Errno: "
              << errno);
    return;
  }

// To control how many times a packet will be forwarded by routers:
#ifdef _SOLARIS
  unsigned char val = TIME_SYNC_TTL;
#else
  int32_t val = TIME_SYNC_TTL;
#endif
#ifdef WIN32
  int32_t retVal = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL,
                              (const char*)&val, sizeof(val));
#else
  int32_t retVal = setsockopt(sock, IPPROTO_IP, IP_MULTICAST_TTL,
                              (const void*)&val, sizeof(val));
#endif
  if (retVal != 0) {
    FWKSEVERE("Failed to set ttl on socket.  Errno: " << errno);
    return;
  }

  struct sockaddr_in addr;
  socklen_t addrLen = sizeof(struct sockaddr_in);
  memset((void*)&addr, 0, addrLen);
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = inet_addr(TIME_SYNC_ADDR);
  addr.sin_port = htons(m_port);

  int32_t tvLen = sizeof(int64_t);
  do {
    ACE_Time_Value tval = ACE_OS::gettimeofday();
    int64_t now = htonl64(timevalMicros(tval));
#ifdef WIN32
    retVal = sendto(sock, (const char*)&now, tvLen, 0,
                    (const struct sockaddr*)&addr, addrLen);
#else
    retVal = sendto(sock, (const void*)&now, tvLen, 0,
                    reinterpret_cast<const struct sockaddr*>(&addr), addrLen);
#endif
    if (retVal == -1) {
      FWKSEVERE("Failed to send TimeSync message.  Errno: " << errno);
    } else {
      if (!m_logged) {
        m_logged = true;
        FWKINFO("Sending time sync messages at " << TIME_SYNC_ADDR << ":"
                                                 << TIME_SYNC_PORT);
      }
    }
    // FWKINFO( "TimeSync sent: " << now );
    perf::sleepSeconds(TIME_SYNC_PAUSE_SECONDS);
  } while (!m_done);
}

// ----------------------------------------------------------------------------

void TimeSync::recvTimeSync() {
  int32_t sock = (int32_t)socket(AF_INET, SOCK_DGRAM, 0);
  if (sock < 0) {
    FWKSEVERE(
        "Failed to create socket for receiving TimeSync messages.  Errno: "
        << errno);
    return;
  }

  // To allow binding multiple applications to the same IP group address:
  int32_t val = 1;
#ifdef WIN32
  int32_t retVal = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&val,
                              sizeof(val));
#else
  int32_t retVal = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const void*)&val,
                              sizeof(val));
#endif
  if (retVal != 0) {
    FWKSEVERE("Failed to set socket option SO_REUSEADDR.  Errno: " << errno);
    return;
  }

  struct sockaddr_in addr;
  socklen_t addrLen = sizeof(struct sockaddr_in);
  memset((void*)&addr, 0, addrLen);
  addr.sin_family = AF_INET;
  addr.sin_addr.s_addr = htonl(INADDR_ANY);
  addr.sin_port = htons(m_port);

  retVal = bind(sock, reinterpret_cast<struct sockaddr*>(&addr), addrLen);
  if (retVal != 0) {
    FWKSEVERE(
        "Failed to bind to socket for receiving TimeSync messages.  Errno: "
        << errno);
    return;
  }

  // use setsockopt() to request that the kernel join a multicast group
  struct ip_mreq mreq;
  mreq.imr_multiaddr.s_addr = inet_addr(TIME_SYNC_ADDR);
  mreq.imr_interface.s_addr = htonl(INADDR_ANY);
#ifdef WIN32
  retVal = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mreq,
                      sizeof(mreq));
#else
  retVal = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void*)&mreq,
                      sizeof(mreq));
#endif
  if (retVal != 0) {
    FWKSEVERE("Failed to join multicast group.  Errno: " << errno);
    return;
  }

  int64_t tval;
  int32_t tvLen = sizeof(int64_t);
  struct sockaddr_in sender;
  socklen_t senderLen = sizeof(struct sockaddr_in);
  memset((void*)&sender, 0, senderLen);

  int32_t cnt = 0;
  int64_t total = 0;
  while (!m_done) {
    tval = 0;
#ifdef WIN32
    retVal = recvfrom(sock, (char*)&tval, tvLen, 0, (struct sockaddr*)&sender,
                      &senderLen);
#else
    retVal = recvfrom(sock, (void*)&tval, tvLen, 0,
                      reinterpret_cast<struct sockaddr*>(&sender), &senderLen);
#endif
    switch (retVal) {
      case -1:  // error!
        if ((errno > 0) && (errno != EAGAIN)) {
          FWKSEVERE(
              "Failed while receiving TimeSync message.  Errno: " << errno);
          return;
        }
        break;
      case 8:  // process the sync message
      {
        ACE_Time_Value now = ACE_OS::gettimeofday();
        int64_t curr = timevalMicros(now);
        tval = ntohl64(tval);
        int64_t diff = (tval - curr);
        total += diff;
        cnt++;
        if (cnt == 10) {
          int32_t avg = static_cast<int32_t>(total / cnt);
          if (*m_delta != avg) {
            *m_delta = avg;
            if (m_report) {
              FWKINFO("Current delta is: " << *m_delta);
            }
          }
          cnt = 0;
          total = 0;
        }
        if (!m_logged) {
          m_logged = true;
          FWKINFO("Receiving time sync messages at " << TIME_SYNC_ADDR << ":"
                                                     << TIME_SYNC_PORT);
        }
      } break;
      case 0:   // nothing received, no-op, fall thru
      default:  // junk??
        break;
    }
  }
}

}  // namespace testframework
}  // namespace client
}  // namespace geode
}  // namespace apache
