/** @file

  A brief file description

  @section license License

  Licensed to the Apache Software Foundation (ASF) under one
  or more contributor license agreements.  See the NOTICE file
  distributed with this work for additional information
  regarding copyright ownership.  The ASF licenses this file
  to you under the Apache License, Version 2.0 (the
  "License"); you may not use this file except in compliance
  with the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
 */

/****************************************************************************

   OneWayMultiTunnel.h
 ****************************************************************************/

#include "P_EventSystem.h"
#include "I_OneWayMultiTunnel.h"

// #define TEST

//////////////////////////////////////////////////////////////////////////////
//
//      OneWayMultiTunnel::OneWayMultiTunnel()
//
//////////////////////////////////////////////////////////////////////////////

ClassAllocator<OneWayMultiTunnel> OneWayMultiTunnelAllocator("OneWayMultiTunnelAllocator");

OneWayMultiTunnel::OneWayMultiTunnel() : OneWayTunnel()
{
  ink_zero(vioTargets);
}

void
OneWayMultiTunnel::init(VConnection *vcSource, VConnection **vcTargets, int n_vcTargets, Continuation *aCont, int size_estimate,
                        int64_t nbytes, bool asingle_buffer, /* = true */
                        bool aclose_source,                  /* = false */
                        bool aclose_targets,                 /* = false */
                        Transform_fn aManipulate_fn, int water_mark)
{
  mutex                            = aCont ? aCont->mutex : make_ptr(new_ProxyMutex());
  cont                             = aCont;
  manipulate_fn                    = aManipulate_fn;
  close_source                     = aclose_source;
  close_target                     = aclose_targets;
  source_read_previously_completed = false;

  SET_HANDLER(&OneWayMultiTunnel::startEvent);

  n_connections = n_vioTargets + 1;

  int64_t size_index = 0;
  if (size_estimate) {
    size_index = buffer_size_to_index(size_estimate, BUFFER_SIZE_INDEX_32K);
  } else {
    size_index = BUFFER_SIZE_INDEX_32K;
  }

  tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);

  MIOBuffer *buf1 = new_MIOBuffer(size_index);
  MIOBuffer *buf2 = nullptr;

  single_buffer = asingle_buffer;

  if (single_buffer) {
    buf2 = buf1;
  } else {
    buf2 = new_MIOBuffer(size_index);
  }
  topOutBuffer.writer_for(buf2);

  buf1->water_mark = water_mark;

  vioSource = vcSource->do_io_read(this, nbytes, buf1);

  ink_assert(n_vcTargets <= ONE_WAY_MULTI_TUNNEL_LIMIT);
  for (int i = 0; i < n_vcTargets; i++) {
    vioTargets[i] = vc_do_io_write(vcTargets[i], this, INT64_MAX, buf2, 0);
  }

  return;
}

void
OneWayMultiTunnel::init(Continuation *aCont, VIO *SourceVio, VIO **TargetVios, int n_TargetVios, bool aclose_source,
                        bool aclose_targets)
{
  mutex         = aCont ? aCont->mutex : make_ptr(new_ProxyMutex());
  cont          = aCont;
  single_buffer = true;
  manipulate_fn = nullptr;
  n_connections = n_TargetVios + 1;
  ;
  close_source = aclose_source;
  close_target = aclose_targets;
  // The read on the source vio may have already been completed, yet
  // we still need to write data into the target buffers.  Note this
  // fact as we'll not get a VC_EVENT_READ_COMPLETE callback later.
  source_read_previously_completed = (SourceVio->ntodo() == 0);
  tunnel_till_done                 = true;
  n_vioTargets                     = n_TargetVios;
  topOutBuffer.writer_for(SourceVio->buffer.writer());

  // do_io_read() read already posted on vcSource.
  // do_io_write() already posted on vcTargets
  SET_HANDLER(&OneWayMultiTunnel::startEvent);

  SourceVio->set_continuation(this);
  vioSource = SourceVio;

  for (int i = 0; i < n_vioTargets; i++) {
    vioTargets[i] = TargetVios[i];
    vioTargets[i]->set_continuation(this);
  }
}

//////////////////////////////////////////////////////////////////////////////
//
//      int OneWayMultiTunnel::startEvent()
//
//////////////////////////////////////////////////////////////////////////////

int
OneWayMultiTunnel::startEvent(int event, void *data)
{
  VIO *vio   = static_cast<VIO *>(data);
  int ret    = VC_EVENT_DONE;
  int result = 0;

#ifdef TEST
  const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
  printf("OneWayMultiTunnel::startEvent --- %s received from %s VC\n", event_name, event_origin);
#endif

  // handle the event
  //
  switch (event) {
  case VC_EVENT_READ_READY: { // SunCC uses old scoping rules
    transform(vioSource->buffer, topOutBuffer);
    for (int i = 0; i < n_vioTargets; i++) {
      if (vioTargets[i]) {
        vioTargets[i]->reenable();
      }
    }
    ret = VC_EVENT_CONT;
    break;
  }

  case VC_EVENT_WRITE_READY:
    if (vioSource) {
      vioSource->reenable();
    }
    ret = VC_EVENT_CONT;
    break;

  case VC_EVENT_EOS:
    if (!tunnel_till_done && vio->ntodo()) {
      goto Lerror;
    }
    if (vio == vioSource) {
      transform(vioSource->buffer, topOutBuffer);
      goto Lread_complete;
    } else {
      goto Lwrite_complete;
    }
    // fallthrough

  Lread_complete:
  case VC_EVENT_READ_COMPLETE: { // SunCC uses old scoping rules
    // set write nbytes to the current buffer size
    //
    for (int i = 0; i < n_vioTargets; i++) {
      if (vioTargets[i]) {
        vioTargets[i]->nbytes = vioTargets[i]->ndone + vioTargets[i]->buffer.reader()->read_avail();
        vioTargets[i]->reenable();
      }
    }
    close_source_vio(0);
    ret = VC_EVENT_DONE;
    break;
  }

  Lwrite_complete:
  case VC_EVENT_WRITE_COMPLETE:
    close_target_vio(0, static_cast<VIO *>(data));
    if ((n_connections == 0) || (n_connections == 1 && source_read_previously_completed)) {
      goto Ldone;
    } else if (vioSource) {
      vioSource->reenable();
    }
    break;

  Lerror:
  case VC_EVENT_ERROR:
  case VC_EVENT_INACTIVITY_TIMEOUT:
  case VC_EVENT_ACTIVE_TIMEOUT:
    result = -1;
  Ldone:
    close_source_vio(result);
    close_target_vio(result);
    connection_closed(result);
    break;

  default:
    ret = VC_EVENT_CONT;
    break;
  }
#ifdef TEST
  printf("    (OneWayMultiTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
#endif
  return (ret);
}

void
OneWayMultiTunnel::close_target_vio(int result, VIO *vio)
{
  for (int i = 0; i < n_vioTargets; i++) {
    VIO *v = vioTargets[i];
    if (v && (!vio || v == vio)) {
      if (last_connection() || !single_buffer) {
        free_MIOBuffer(v->buffer.writer());
      }
      if (close_target) {
        v->vc_server->do_io_close();
      }
      vioTargets[i] = nullptr;
      n_connections--;
    }
  }
}

void
OneWayMultiTunnel::reenable_all()
{
  for (int i = 0; i < n_vioTargets; i++) {
    if (vioTargets[i]) {
      vioTargets[i]->reenable();
    }
  }
  if (vioSource) {
    vioSource->reenable();
  }
}
