/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#define FD_SETSIZE 2048
#ifndef _WIN32_WINNT
#define _WIN32_WINNT 0x0501
#endif
#if _WIN32_WINNT < 0x0501
#error "Proton requires Windows API support for XP or later."
#endif
#include <winsock2.h>
#include <mswsock.h>
#include <Ws2tcpip.h>

#include "platform.h"
#include <proton/io.h>
#include <proton/object.h>
#include <proton/selector.h>
#include "iocp.h"
#include "util.h"

#include <ctype.h>
#include <errno.h>
#include <stdio.h>
#include <assert.h>

int pni_win32_error(pn_error_t *error, const char *msg, HRESULT code)
{
  // Error code can be from GetLastError or WSAGetLastError,
  char err[1024] = {0};
  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS |
                FORMAT_MESSAGE_MAX_WIDTH_MASK, NULL, code, 0, (LPSTR)&err, sizeof(err), NULL);
  return pn_error_format(error, PN_ERR, "%s: %s", msg, err);
}

static void io_log(const char *fmt, ...)
{
  va_list ap;
  va_start(ap, fmt);
  vfprintf(stderr, fmt, ap);
  va_end(ap);
  fflush(stderr);
}

struct pn_io_t {
  char host[NI_MAXHOST];
  char serv[NI_MAXSERV];
  pn_error_t *error;
  bool trace;
  bool wouldblock;
  iocp_t *iocp;
};

void pn_io_initialize(void *obj)
{
  pn_io_t *io = (pn_io_t *) obj;
  io->error = pn_error();
  io->wouldblock = false;
  io->trace = pn_env_bool("PN_TRACE_DRV");

  /* Request WinSock 2.2 */
  WORD wsa_ver = MAKEWORD(2, 2);
  WSADATA unused;
  int err = WSAStartup(wsa_ver, &unused);
  if (err) {
    pni_win32_error(io->error, "WSAStartup", WSAGetLastError());
    fprintf(stderr, "Can't load WinSock: %s\n", pn_error_text(io->error));
  }
  io->iocp = pni_iocp();
}

void pn_io_finalize(void *obj)
{
  pn_io_t *io = (pn_io_t *) obj;
  pn_error_free(io->error);
  pn_free(io->iocp);
  WSACleanup();
}

#define pn_io_hashcode NULL
#define pn_io_compare NULL
#define pn_io_inspect

pn_io_t *pn_io(void)
{
  static const pn_class_t clazz = PN_CLASS(pn_io);
  pn_io_t *io = (pn_io_t *) pn_class_new(&clazz, sizeof(pn_io_t));
  return io;
}

void pn_io_free(pn_io_t *io)
{
  pn_free(io);
}

pn_error_t *pn_io_error(pn_io_t *io)
{
  assert(io);
  return io->error;
}

static void ensure_unique(pn_io_t *io, pn_socket_t new_socket)
{
  // A brand new socket can have the same HANDLE value as a previous
  // one after a socketclose.  If the application closes one itself
  // (i.e. not using pn_close), we don't find out about it until here.
  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, new_socket);
  if (iocpd) {
    if (io->trace)
      io_log("Stale external socket reference discarded\n");
    // Re-use means former socket instance was closed
    assert(iocpd->ops_in_progress == 0);
    assert(iocpd->external);
    // Clean up the straggler as best we can
    pn_socket_t sock = iocpd->socket;
    iocpd->socket = INVALID_SOCKET;
    pni_iocpdesc_map_del(io->iocp, sock);  // may free the iocpdesc_t depending on refcount
  }
}


/*
 * This heavyweight surrogate pipe could be replaced with a normal Windows pipe
 * now that select() is no longer used.  If interrupt semantics are all that is
 * needed, a simple user space counter and reserved completion status would
 * probably suffice.
 */
static int pni_socket_pair(pn_io_t *io, SOCKET sv[2]);

int pn_pipe(pn_io_t *io, pn_socket_t *dest)
{
  int n = pni_socket_pair(io, dest);
  if (n) {
    pni_win32_error(io->error, "pipe", WSAGetLastError());
  }
  return n;
}

static void pn_configure_sock(pn_io_t *io, pn_socket_t sock) {
  //
  // Disable the Nagle algorithm on TCP connections.
  //
  int flag = 1;
  if (setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(flag)) != 0) {
    perror("setsockopt");
  }

  u_long nonblock = 1;
  if (ioctlsocket(sock, FIONBIO, &nonblock)) {
    perror("ioctlsocket");
  }
}

static inline pn_socket_t pni_create_socket(int domain, int protocol);

static const char *amqp_service(const char *port) {
  // Help older Windows to know about amqp[s] ports
  if (port) {
    if (!strcmp("amqp", port)) return "5672";
    if (!strcmp("amqps", port)) return "5671";
  }
  return port;
}

pn_socket_t pn_listen(pn_io_t *io, const char *host, const char *port)
{
  struct addrinfo *addr;
  int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
  if (code) {
    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s\n", host, port, gai_strerror(code));
    return INVALID_SOCKET;
  }

  pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
  if (sock == INVALID_SOCKET) {
    pni_win32_error(io->error, "pni_create_socket", WSAGetLastError());
    return INVALID_SOCKET;
  }
  ensure_unique(io, sock);

  bool optval = 1;
  if (setsockopt(sock, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, (const char *) &optval,
                 sizeof(optval)) == -1) {
    pni_win32_error(io->error, "setsockopt", WSAGetLastError());
    closesocket(sock);
    return INVALID_SOCKET;
  }

  if (bind(sock, addr->ai_addr, addr->ai_addrlen) == -1) {
    pni_win32_error(io->error, "bind", WSAGetLastError());
    freeaddrinfo(addr);
    closesocket(sock);
    return INVALID_SOCKET;
  }
  freeaddrinfo(addr);

  if (listen(sock, 50) == -1) {
    pni_win32_error(io->error, "listen", WSAGetLastError());
    closesocket(sock);
    return INVALID_SOCKET;
  }

  if (io->iocp->selector) {
    iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
    if (!iocpd) {
      pn_i_error_from_errno(io->error, "register");
      closesocket(sock);
      return INVALID_SOCKET;
    }
    pni_iocpdesc_start(iocpd);
  }

  return sock;
}

pn_socket_t pn_connect(pn_io_t *io, const char *hostarg, const char *port)
{
  // convert "0.0.0.0" to "127.0.0.1" on Windows for outgoing sockets
  const char *host = strcmp("0.0.0.0", hostarg) ? hostarg : "127.0.0.1";

  struct addrinfo *addr;
  int code = getaddrinfo(host, amqp_service(port), NULL, &addr);
  if (code) {
    pn_error_format(io->error, PN_ERR, "getaddrinfo(%s, %s): %s", host, port, gai_strerror(code));
    return INVALID_SOCKET;
  }

  pn_socket_t sock = pni_create_socket(addr->ai_family, addr->ai_protocol);
  if (sock == INVALID_SOCKET) {
    pni_win32_error(io->error, "proton pni_create_socket", WSAGetLastError());
    freeaddrinfo(addr);
    return INVALID_SOCKET;
  }

  ensure_unique(io, sock);
  pn_configure_sock(io, sock);

  if (io->iocp->selector) {
    return pni_iocp_begin_connect(io->iocp, sock, addr, io->error);
  } else {
    if (connect(sock, addr->ai_addr, addr->ai_addrlen) != 0) {
      if (WSAGetLastError() != WSAEWOULDBLOCK) {
	pni_win32_error(io->error, "connect", WSAGetLastError());
	freeaddrinfo(addr);
	closesocket(sock);
	return INVALID_SOCKET;
      }
    }

    freeaddrinfo(addr);
    return sock;
  }
}

pn_socket_t pn_accept(pn_io_t *io, pn_socket_t listen_sock, char *name, size_t size)
{
  struct sockaddr_storage addr;
  socklen_t addrlen = sizeof(addr);
  iocpdesc_t *listend = pni_iocpdesc_map_get(io->iocp, listen_sock);
  pn_socket_t accept_sock;

  *name = '\0';
  if (listend)
    accept_sock = pni_iocp_end_accept(listend, (struct sockaddr *) &addr, &addrlen, &io->wouldblock, io->error);
  else {
    // User supplied socket
    accept_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
    if (accept_sock == INVALID_SOCKET)
      pni_win32_error(io->error, "sync accept", WSAGetLastError());
  }

  if (accept_sock == INVALID_SOCKET)
    return accept_sock;

  int code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
                         io->serv, NI_MAXSERV, 0);
  if (code)
    code = getnameinfo((struct sockaddr *) &addr, addrlen, io->host, NI_MAXHOST,
                       io->serv, NI_MAXSERV, NI_NUMERICHOST | NI_NUMERICSERV);
  if (code) {
    pn_error_format(io->error, PN_ERR, "getnameinfo: %s\n", gai_strerror(code));
    pn_close(io, accept_sock);
    return INVALID_SOCKET;
  } else {
    pn_configure_sock(io, accept_sock);
    snprintf(name, size, "%s:%s", io->host, io->serv);
    if (listend) {
      pni_iocpdesc_start(pni_iocpdesc_map_get(io->iocp, accept_sock));
    }
    return accept_sock;
  }
}

static inline pn_socket_t pni_create_socket(int domain, int protocol) {
  return socket(domain, SOCK_STREAM, protocol);
}

ssize_t pn_send(pn_io_t *io, pn_socket_t sockfd, const void *buf, size_t len) {
  ssize_t count;
  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, sockfd);
  if (iocpd) {
    count = pni_iocp_begin_write(iocpd, buf, len, &io->wouldblock, io->error);
  } else {
    count = send(sockfd, (const char *) buf, len, 0);
    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
  }
  return count;
}

ssize_t pn_recv(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
{
  ssize_t count;
  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
  if (iocpd) {
    count = pni_iocp_recv(iocpd, buf, size, &io->wouldblock, io->error);
  } else {
    count = recv(socket, (char *) buf, size, 0);
    io->wouldblock = count < 0 && WSAGetLastError() == WSAEWOULDBLOCK;
  }
  return count;
}

ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size)
{
  // non-socket io is mapped to socket io for now.  See pn_pipe()
  return pn_send(io, socket, buf, size);
}

ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size)
{
  return pn_recv(io, socket, buf, size);
}

void pn_close(pn_io_t *io, pn_socket_t socket)
{
  iocpdesc_t *iocpd = pni_iocpdesc_map_get(io->iocp, socket);
  if (iocpd)
    pni_iocp_begin_close(iocpd);
  else {
    closesocket(socket);
  }
}

bool pn_wouldblock(pn_io_t *io)
{
  return io->wouldblock;
}

pn_selector_t *pn_io_selector(pn_io_t *io)
{
  if (io->iocp->selector == NULL)
    io->iocp->selector = pni_selector_create(io->iocp);
  return io->iocp->selector;
}

static void configure_pipe_socket(pn_io_t *io, pn_socket_t sock)
{
  u_long v = 1;
  ioctlsocket (sock, FIONBIO, &v);
  ensure_unique(io, sock);
  iocpdesc_t *iocpd = pni_iocpdesc_create(io->iocp, sock, false);
  pni_iocpdesc_start(iocpd);
}


static int pni_socket_pair (pn_io_t *io, SOCKET sv[2]) {
  // no socketpair on windows.  provide pipe() semantics using sockets
  struct protoent * pe_tcp = getprotobyname("tcp");
  if (pe_tcp == NULL) {
    perror("getprotobyname");
    return -1;
  }

  SOCKET sock = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto);
  if (sock == INVALID_SOCKET) {
    perror("socket");
    return -1;
  }

  BOOL b = 1;
  if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char *) &b, sizeof(b)) == -1) {
    perror("setsockopt");
    closesocket(sock);
    return -1;
  }
  else {
    struct sockaddr_in addr = {0};
    addr.sin_family = AF_INET;
    addr.sin_port = 0;
    addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);

    if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
      perror("bind");
      closesocket(sock);
      return -1;
    }
  }

  if (listen(sock, 50) == -1) {
    perror("listen");
    closesocket(sock);
    return -1;
  }

  if ((sv[1] = socket(AF_INET, SOCK_STREAM, pe_tcp->p_proto)) == INVALID_SOCKET) {
    perror("sock1");
    closesocket(sock);
    return -1;
  }
  else {
    struct sockaddr addr = {0};
    int l = sizeof(addr);
    if (getsockname(sock, &addr, &l) == -1) {
      perror("getsockname");
      closesocket(sock);
      return -1;
    }

    if (connect(sv[1], &addr, sizeof(addr)) == -1) {
      int err = WSAGetLastError();
      fprintf(stderr, "connect wsaerrr %d\n", err);
      closesocket(sock);
      closesocket(sv[1]);
      return -1;
    }

    if ((sv[0] = accept(sock, &addr, &l)) == INVALID_SOCKET) {
      perror("accept");
      closesocket(sock);
      closesocket(sv[1]);
      return -1;
    }
  }

  configure_pipe_socket(io, sv[0]);
  configure_pipe_socket(io, sv[1]);
  closesocket(sock);
  return 0;
}
