blob: 2a3f8783136fe830e3997aaf1b47135d1b77c4bc [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/****************************************************************************
OneWayMultiTunnel.h
****************************************************************************/
#include "P_EventSystem.h"
#include "I_OneWayMultiTunnel.h"
// #define TEST
//////////////////////////////////////////////////////////////////////////////
//
// OneWayMultiTunnel::OneWayMultiTunnel()
//
//////////////////////////////////////////////////////////////////////////////
ClassAllocator<OneWayMultiTunnel> OneWayMultiTunnelAllocator("OneWayMultiTunnelAllocator");
OneWayMultiTunnel::OneWayMultiTunnel() : OneWayTunnel()
{
ink_zero(vioTargets);
}
void
OneWayMultiTunnel::init(VConnection *vcSource, VConnection **vcTargets, int n_vcTargets, Continuation *aCont, int size_estimate,
int64_t nbytes, bool asingle_buffer, /* = true */
bool aclose_source, /* = false */
bool aclose_targets, /* = false */
Transform_fn aManipulate_fn, int water_mark)
{
mutex = aCont ? aCont->mutex : make_ptr(new_ProxyMutex());
cont = aCont;
manipulate_fn = aManipulate_fn;
close_source = aclose_source;
close_target = aclose_targets;
source_read_previously_completed = false;
SET_HANDLER(&OneWayMultiTunnel::startEvent);
n_connections = n_vioTargets + 1;
int64_t size_index = 0;
if (size_estimate) {
size_index = buffer_size_to_index(size_estimate, BUFFER_SIZE_INDEX_32K);
} else {
size_index = BUFFER_SIZE_INDEX_32K;
}
tunnel_till_done = (nbytes == TUNNEL_TILL_DONE);
MIOBuffer *buf1 = new_MIOBuffer(size_index);
MIOBuffer *buf2 = nullptr;
single_buffer = asingle_buffer;
if (single_buffer) {
buf2 = buf1;
} else {
buf2 = new_MIOBuffer(size_index);
}
topOutBuffer.writer_for(buf2);
buf1->water_mark = water_mark;
vioSource = vcSource->do_io_read(this, nbytes, buf1);
ink_assert(n_vcTargets <= ONE_WAY_MULTI_TUNNEL_LIMIT);
for (int i = 0; i < n_vcTargets; i++) {
vioTargets[i] = vc_do_io_write(vcTargets[i], this, INT64_MAX, buf2, 0);
}
return;
}
void
OneWayMultiTunnel::init(Continuation *aCont, VIO *SourceVio, VIO **TargetVios, int n_TargetVios, bool aclose_source,
bool aclose_targets)
{
mutex = aCont ? aCont->mutex : make_ptr(new_ProxyMutex());
cont = aCont;
single_buffer = true;
manipulate_fn = nullptr;
n_connections = n_TargetVios + 1;
;
close_source = aclose_source;
close_target = aclose_targets;
// The read on the source vio may have already been completed, yet
// we still need to write data into the target buffers. Note this
// fact as we'll not get a VC_EVENT_READ_COMPLETE callback later.
source_read_previously_completed = (SourceVio->ntodo() == 0);
tunnel_till_done = true;
n_vioTargets = n_TargetVios;
topOutBuffer.writer_for(SourceVio->buffer.writer());
// do_io_read() read already posted on vcSource.
// do_io_write() already posted on vcTargets
SET_HANDLER(&OneWayMultiTunnel::startEvent);
SourceVio->set_continuation(this);
vioSource = SourceVio;
for (int i = 0; i < n_vioTargets; i++) {
vioTargets[i] = TargetVios[i];
vioTargets[i]->set_continuation(this);
}
}
//////////////////////////////////////////////////////////////////////////////
//
// int OneWayMultiTunnel::startEvent()
//
//////////////////////////////////////////////////////////////////////////////
int
OneWayMultiTunnel::startEvent(int event, void *data)
{
VIO *vio = static_cast<VIO *>(data);
int ret = VC_EVENT_DONE;
int result = 0;
#ifdef TEST
const char *event_origin = (vio == vioSource ? "source" : "target"), *event_name = get_vc_event_name(event);
printf("OneWayMultiTunnel::startEvent --- %s received from %s VC\n", event_name, event_origin);
#endif
// handle the event
//
switch (event) {
case VC_EVENT_READ_READY: { // SunCC uses old scoping rules
transform(vioSource->buffer, topOutBuffer);
for (int i = 0; i < n_vioTargets; i++) {
if (vioTargets[i]) {
vioTargets[i]->reenable();
}
}
ret = VC_EVENT_CONT;
break;
}
case VC_EVENT_WRITE_READY:
if (vioSource) {
vioSource->reenable();
}
ret = VC_EVENT_CONT;
break;
case VC_EVENT_EOS:
if (!tunnel_till_done && vio->ntodo()) {
goto Lerror;
}
if (vio == vioSource) {
transform(vioSource->buffer, topOutBuffer);
goto Lread_complete;
} else {
goto Lwrite_complete;
}
// fallthrough
Lread_complete:
case VC_EVENT_READ_COMPLETE: { // SunCC uses old scoping rules
// set write nbytes to the current buffer size
//
for (int i = 0; i < n_vioTargets; i++) {
if (vioTargets[i]) {
vioTargets[i]->nbytes = vioTargets[i]->ndone + vioTargets[i]->buffer.reader()->read_avail();
vioTargets[i]->reenable();
}
}
close_source_vio(0);
ret = VC_EVENT_DONE;
break;
}
Lwrite_complete:
case VC_EVENT_WRITE_COMPLETE:
close_target_vio(0, static_cast<VIO *>(data));
if ((n_connections == 0) || (n_connections == 1 && source_read_previously_completed)) {
goto Ldone;
} else if (vioSource) {
vioSource->reenable();
}
break;
Lerror:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT:
result = -1;
Ldone:
close_source_vio(result);
close_target_vio(result);
connection_closed(result);
break;
default:
ret = VC_EVENT_CONT;
break;
}
#ifdef TEST
printf(" (OneWayMultiTunnel returning value: %s)\n", (ret == VC_EVENT_DONE ? "VC_EVENT_DONE" : "VC_EVENT_CONT"));
#endif
return (ret);
}
void
OneWayMultiTunnel::close_target_vio(int result, VIO *vio)
{
for (int i = 0; i < n_vioTargets; i++) {
VIO *v = vioTargets[i];
if (v && (!vio || v == vio)) {
if (last_connection() || !single_buffer) {
free_MIOBuffer(v->buffer.writer());
}
if (close_target) {
v->vc_server->do_io_close();
}
vioTargets[i] = nullptr;
n_connections--;
}
}
}
void
OneWayMultiTunnel::reenable_all()
{
for (int i = 0; i < n_vioTargets; i++) {
if (vioTargets[i]) {
vioTargets[i]->reenable();
}
}
if (vioSource) {
vioSource->reenable();
}
}