blob: 6022b9fac6e31645d718c4cff32cec543c08f38b [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/****************************************************************************
ClusterHandlerBase.cc
****************************************************************************/
#include "P_Cluster.h"
extern int cluster_receive_buffer_size;
extern int cluster_send_buffer_size;
extern uint32_t cluster_sockopt_flags;
extern uint32_t cluster_packet_mark;
extern uint32_t cluster_packet_tos;
extern int num_of_cluster_threads;
///////////////////////////////////////////////////////////////
// Incoming message continuation for periodic callout threads
///////////////////////////////////////////////////////////////
ClusterCalloutContinuation::ClusterCalloutContinuation(struct ClusterHandler *ch)
:
Continuation(0),
_ch(ch)
{
mutex = new_ProxyMutex();
SET_HANDLER((ClstCoutContHandler)
& ClusterCalloutContinuation::CalloutHandler);
}
ClusterCalloutContinuation::~ClusterCalloutContinuation()
{
mutex = 0;
}
int
ClusterCalloutContinuation::CalloutHandler(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
return _ch->process_incoming_callouts(this->mutex);
}
/*************************************************************************/
// ClusterControl member functions (Internal Class)
/*************************************************************************/
ClusterControl::ClusterControl():
Continuation(NULL), len(0), size_index(-1), real_data(0), data(0), free_proc(0), free_proc_arg(0), iob_block(0)
{
}
void
ClusterControl::real_alloc_data(int read_access, bool align_int32_on_non_int64_boundary)
{
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
ink_assert(!data);
if ((len + DATA_HDR + sizeof(int32_t)) <= DEFAULT_MAX_BUFFER_SIZE) {
size_index = buffer_size_to_index(len + DATA_HDR + sizeof(int32_t), MAX_BUFFER_SIZE_INDEX);
iob_block = new_IOBufferBlock();
iob_block->alloc(size_index); // aligns on 8 byte boundary
real_data = (int64_t *) iob_block->buf();
if (align_int32_on_non_int64_boundary) {
data = ((char *) real_data) + sizeof(int32_t) + DATA_HDR;
} else {
data = ((char *) real_data) + DATA_HDR;
}
} else {
int size = sizeof(int64_t) * (((len + DATA_HDR + sizeof(int32_t) + sizeof(int64_t) - 1) / sizeof(int64_t)) + 1);
size_index = -1;
iob_block = new_IOBufferBlock();
iob_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
real_data = (int64_t *) iob_block->buf();
if (align_int32_on_non_int64_boundary) {
data = (char *) DOUBLE_ALIGN(real_data) + sizeof(int32_t) + DATA_HDR;
} else {
data = (char *) DOUBLE_ALIGN(real_data) + DATA_HDR;
}
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_ALLOC_DATA_NEWS_STAT);
}
// IOBufferBlock adjustments
if (read_access) {
// Make iob_block->read_avail() == len
iob_block->fill((char *) data - (char *) real_data); // skip header
iob_block->consume((char *) data - (char *) real_data); // skip header
iob_block->fill(len);
} else {
// Make iob_block->write_avail() == len
iob_block->fill((char *) data - (char *) real_data); // skip header
iob_block->consume((char *) data - (char *) real_data); // skip header
iob_block->_buf_end = iob_block->end() + len;
}
// Write size_index, magic number and 'this' in leading bytes
char *size_index_ptr = (char *) data - DATA_HDR;
*size_index_ptr = size_index;
++size_index_ptr;
*size_index_ptr = (char) ALLOC_DATA_MAGIC;
++size_index_ptr;
void *val = (void *) this;
memcpy(size_index_ptr, (char *) &val, sizeof(void *));
}
void
ClusterControl::free_data()
{
if (data && iob_block) {
if (free_proc) {
// Free memory via callback proc
(*free_proc) (free_proc_arg);
iob_block = 0; // really free memory
return;
}
if (real_data) {
ink_release_assert(*(((uint8_t *) data) - DATA_HDR + 1) == (uint8_t) ALLOC_DATA_MAGIC);
*(((uint8_t *) data) - DATA_HDR + 1) = (uint8_t) ~ ALLOC_DATA_MAGIC;
ink_release_assert(*(((char *) data) - DATA_HDR) == size_index);
} else {
// malloc'ed memory, not alloced via real_alloc_data().
// Data will be ats_free()'ed when IOBufferBlock is freed
}
iob_block = 0; // free memory
}
}
/*************************************************************************/
// IncomingControl member functions (Internal Class)
/*************************************************************************/
IncomingControl *
IncomingControl::alloc()
{
return inControlAllocator.alloc();
}
IncomingControl::IncomingControl()
:recognized_time(0)
{
}
void
IncomingControl::freeall()
{
free_data();
inControlAllocator.free(this);
}
/*************************************************************************/
// OutgoingControl member functions (Internal Class)
/*************************************************************************/
OutgoingControl *
OutgoingControl::alloc()
{
return outControlAllocator.alloc();
}
OutgoingControl::OutgoingControl()
:ch(NULL), submit_time(0)
{
}
int
OutgoingControl::startEvent(int event, Event * e)
{
//
// This event handler is used by ClusterProcessor::invoke_remote()
// to delay (CLUSTER_OPT_DELAY) the enqueuing of the control message.
//
(void) event;
(void) e;
// verify that the machine has not gone down
if (!ch || !ch->thread)
return EVENT_DONE;
int32_t cluster_fn = *(int32_t *) this->data;
int32_t pri = ClusterFuncToQpri(cluster_fn);
ink_atomiclist_push(&ch->outgoing_control_al[pri], (void *) this);
return EVENT_DONE;
}
void
OutgoingControl::freeall()
{
free_data();
outControlAllocator.free(this);
}
/*************************************************************************/
// ClusterState member functions (Internal Class)
/*************************************************************************/
ClusterState::ClusterState(ClusterHandler * c, bool read_chan):
Continuation(0),
ch(c),
read_channel(read_chan),
do_iodone_event(false),
n_descriptors(0),
sequence_number(0),
to_do(0),
did(0),
n_iov(0),
io_complete(1),
io_complete_event(0),
v(0),
bytes_xfered(0),
last_ndone(0),
total_bytes_xfered(0),
iov(NULL),
iob_iov(NULL),
byte_bank(NULL),
n_byte_bank(0), byte_bank_size(0), missed(0), missed_msg(false), read_state_t(READ_START), write_state_t(WRITE_START)
{
mutex = new_ProxyMutex();
if (read_channel) {
state = ClusterState::READ_START;
SET_HANDLER(&ClusterState::doIO_read_event);
} else {
state = ClusterState::WRITE_START;
SET_HANDLER(&ClusterState::doIO_write_event);
}
last_time = HRTIME_SECONDS(0);
start_time = HRTIME_SECONDS(0);
int size;
//
// Note: we allocate space for maximum iovec(s), descriptor(s)
// and small control message data.
//
//////////////////////////////////////////////////
// Place an invalid page in front of iovec data.
//////////////////////////////////////////////////
size_t pagesize = ats_pagesize();
size = ((MAX_TCOUNT + 1) * sizeof(IOVec)) + (2 * pagesize);
iob_iov = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(size));
char *addr = (char *) align_pointer_forward(iob_iov->data(), pagesize);
#if defined(__sparc)
if (mprotect(addr, pagesize, PROT_NONE))
perror("ClusterState mprotect0 failed");
#endif
iov = (IOVec *) (addr + pagesize);
///////////////////////////////////////////////////
// Place an invalid page in front of message data.
///////////////////////////////////////////////////
size = sizeof(ClusterMsgHeader) + (MAX_TCOUNT + 1) * sizeof(Descriptor)
+ CONTROL_DATA + (2 * pagesize);
msg.iob_descriptor_block = new_IOBufferBlock();
msg.iob_descriptor_block->alloc(BUFFER_SIZE_FOR_XMALLOC(size));
addr = (char *) align_pointer_forward(msg.iob_descriptor_block->data->data(), pagesize);
#if defined(__sparc)
if (mprotect(addr, pagesize, PROT_NONE))
perror("ClusterState mprotect failed");
#endif
addr = addr + pagesize;
memset(addr, 0, size - (2 * pagesize));
msg.descriptor = (Descriptor *) (addr + sizeof(ClusterMsgHeader));
mbuf = new_empty_MIOBuffer();
}
ClusterState::~ClusterState()
{
mutex = 0;
#if defined(__sparc)
int pagesize = ats_pagesize();
#endif
if (iov) {
#if defined(__sparc)
iov = (IOVec *) ((char *) iov - pagesize);
if (mprotect((char *) iov, pagesize, (PROT_READ | PROT_WRITE)))
perror("~ClusterState mprotect0 failed");
#endif
iob_iov = 0; // Free memory
}
if (msg.descriptor) {
#if defined(__sparc)
char *a = (char *) msg.descriptor - (sizeof(ClusterMsgHeader) + pagesize);
if (mprotect(a, pagesize, (PROT_READ | PROT_WRITE)))
perror("~ClusterState mprotect failed");
#endif
msg.iob_descriptor_block = 0; // Free memory
}
// Deallocate IO Core structures
int n;
for (n = 0; n < MAX_TCOUNT; ++n) {
block[n] = 0;
}
free_empty_MIOBuffer(mbuf);
mbuf = 0;
}
void
ClusterState::build_do_io_vector()
{
//
// Construct the do_io_xxx data structures allowing transfer
// of the data described by the iovec structure.
//
int bytes_to_xfer = 0;
int n;
IOBufferBlock *last_block = 0;
mbuf->clear();
// Build the IOBufferBlock chain.
for (n = 0; n < n_iov; ++n) {
bytes_to_xfer += iov[n].iov_len;
if (last_block) {
last_block->next = block[n];
}
last_block = block[n];
while (last_block->next) {
last_block = last_block->next;
}
}
mbuf->_writer = block[0];
ink_release_assert(bytes_to_xfer == to_do);
ink_assert(bytes_to_xfer == bytes_IOBufferBlockList(mbuf->_writer, !read_channel));
}
#ifdef CLUSTER_TOMCAT
#define REENABLE_IO() \
if (!ch->on_stolen_thread && !io_complete) { \
v->reenable_re(); \
}
#else // !CLUSTER_TOMCAT
#ifdef CLUSTER_IMMEDIATE_NETIO
#define REENABLE_IO() \
if (!io_complete) { \
((NetVConnection *) v->vc_server)->reenable_re_now(v); \
}
#else // !CLUSTER_IMMEDIATE_NETIO
#define REENABLE_IO() \
if (!io_complete) { \
v->reenable_re(); \
}
#endif // !CLUSTER_IMMEDIATE_NETIO
#endif // !CLUSTER_TOMCAT
int
ClusterState::doIO()
{
ink_release_assert(io_complete);
#if !defined(CLUSTER_IMMEDIATE_NETIO)
MUTEX_TRY_LOCK(lock, this->mutex, this_ethread());
if (!lock) {
return 0; // unable to initiate operation
}
#endif
if (!ch->net_vc) {
// Node has gone down, simulate successful transfer
io_complete = 1;
bytes_xfered += to_do;
to_do = 0;
return 1;
}
//
// Setup and initiate or resume Cluster i/o request to the NetProcessor.
//
if ((to_do && (io_complete_event == VC_EVENT_READ_READY)) || (io_complete_event == VC_EVENT_WRITE_READY)) {
if (read_channel) {
// Partial read case
ink_assert(v->buffer.writer()->current_write_avail() == to_do);
} else {
// Partial write case
ink_assert(v->buffer.reader()->read_avail() == to_do);
}
// Resume operation
v->nbytes = to_do + did;
ink_release_assert(v->nbytes > v->ndone);
io_complete = false;
io_complete_event = 0;
REENABLE_IO();
} else {
// Start new do_io_xxx operation.
// Initialize globals
io_complete = false;
io_complete_event = 0;
bytes_xfered = 0;
last_ndone = 0;
build_do_io_vector();
if (read_channel) {
ink_assert(mbuf->current_write_avail() == to_do);
#ifdef CLUSTER_IMMEDIATE_NETIO
v = ch->net_vc->do_io_read_now(this, to_do, mbuf);
#else
v = ch->net_vc->do_io_read(this, to_do, mbuf);
#endif
REENABLE_IO();
} else {
IOBufferReader *r = mbuf->alloc_reader();
r->block = mbuf->_writer;
ink_assert(r->read_avail() == to_do);
#ifdef CLUSTER_IMMEDIATE_NETIO
v = ch->net_vc->do_io_write_now(this, to_do, r);
#else
v = ch->net_vc->do_io_write(this, to_do, r);
#endif
REENABLE_IO();
}
}
return 1; // operation initiated
}
int
ClusterState::doIO_read_event(int event, void *d)
{
ink_release_assert(!io_complete);
if (!v) {
v = (VIO *) d; // Immediate callback on first NetVC read
}
ink_assert((VIO *) d == v);
switch (event) {
case VC_EVENT_READ_READY:
{
// Disable read processing
v->nbytes = v->ndone;
// fall through
}
case VC_EVENT_READ_COMPLETE:
{
bytes_xfered = v->ndone - last_ndone;
if (bytes_xfered) {
total_bytes_xfered += bytes_xfered;
did += bytes_xfered;
to_do -= bytes_xfered;
}
last_ndone = v->ndone;
io_complete_event = event;
INK_WRITE_MEMORY_BARRIER;
io_complete = 1;
IOComplete();
break;
}
case VC_EVENT_EOS:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT:
default:
{
io_complete_event = event;
INK_WRITE_MEMORY_BARRIER;
io_complete = -1;
IOComplete();
break;
}
} // End of switch
return EVENT_DONE;
}
int
ClusterState::doIO_write_event(int event, void *d)
{
ink_release_assert(!io_complete);
if (!v) {
v = (VIO *) d; // Immediate callback on first NetVC write
}
ink_assert((VIO *) d == v);
switch (event) {
case VC_EVENT_WRITE_READY:
#ifdef CLUSTER_IMMEDIATE_NETIO
{
// Disable write processing
v->nbytes = v->ndone;
// fall through
}
#endif
case VC_EVENT_WRITE_COMPLETE:
{
bytes_xfered = v->ndone - last_ndone;
if (bytes_xfered) {
total_bytes_xfered += bytes_xfered;
did += bytes_xfered;
to_do -= bytes_xfered;
}
last_ndone = v->ndone;
#ifdef CLUSTER_IMMEDIATE_NETIO
io_complete_event = event;
INK_WRITE_MEMORY_BARRIER;
io_complete = 1;
IOComplete();
#else
if (event == VC_EVENT_WRITE_COMPLETE) {
io_complete_event = event;
INK_WRITE_MEMORY_BARRIER;
io_complete = 1;
IOComplete();
} else {
if (bytes_xfered) {
v->reenable_re(); // Immediate action
} else {
v->reenable();
}
return EVENT_DONE;
}
#endif
break;
}
case VC_EVENT_EOS:
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT:
default:
{
io_complete_event = event;
INK_WRITE_MEMORY_BARRIER;
io_complete = -1;
IOComplete();
break;
}
} // End of switch
return EVENT_DONE;
}
void
ClusterState::IOComplete()
{
// If no thread appears (approximate check) to be holding
// the ClusterHandler mutex (no cluster processing in progress)
// and immediate i/o completion events are allowed,
// start i/o completion processing.
if (do_iodone_event && !ch->mutex->thread_holding) {
MUTEX_TRY_LOCK(lock, ch->mutex, this_ethread());
if (lock) {
ch->handleEvent(EVENT_IMMEDIATE, (void *) 0);
} else {
eventProcessor.schedule_imm_signal(ch, ET_CLUSTER);
}
}
}
int
ClusterHandler::cluster_signal_and_update(int event, ClusterVConnection * vc, ClusterVConnState * s)
{
s->vio._cont->handleEvent(event, &s->vio);
if (vc->closed) {
if (!vc->write_list && !vc->write_bytes_in_transit) {
close_ClusterVConnection(vc);
}
return EVENT_DONE;
} else {
ink_assert((event != VC_EVENT_ERROR) || ((event == VC_EVENT_ERROR) && vc->closed));
return EVENT_CONT;
}
}
int
ClusterHandler::cluster_signal_and_update_locked(int event, ClusterVConnection * vc, ClusterVConnState * s)
{
// should assert we have s->vio.mutex
s->vio._cont->handleEvent(event, &s->vio);
if (vc->closed) {
if (!vc->write_list && !vc->write_bytes_in_transit) {
close_free_lock(vc, s);
}
return EVENT_DONE;
} else
return EVENT_CONT;
}
int
ClusterHandler::cluster_signal_error_and_update(ClusterVConnection * vc, ClusterVConnState * s, int lerrno)
{
s->enabled = 0;
vc->lerrno = lerrno;
return cluster_signal_and_update(VC_EVENT_ERROR, vc, s);
}
bool ClusterHandler::check_channel(int c)
{
//
// Check to see that there is enough room to store channel c
//
while (n_channels <= c) {
int
old_channels = n_channels;
if (!n_channels) {
n_channels = MIN_CHANNELS;
} else {
if ((n_channels * 2) <= MAX_CHANNELS) {
n_channels = n_channels * 2;
} else {
return false; // Limit exceeded
}
}
// Allocate ClusterVConnection table entries
channels = (ClusterVConnection **)ats_realloc(channels, n_channels * sizeof(ClusterVConnection *));
// Allocate ChannelData table entries
channel_data = (struct ChannelData **)ats_realloc(channel_data, n_channels * sizeof(struct ChannelData *));
for (int i = old_channels; i < n_channels; i++) {
if (local_channel(i)) {
if (i > LAST_DEDICATED_CHANNEL) {
channels[i] = (ClusterVConnection *) 1; // mark as invalid
channel_data[i] = (struct ChannelData *)ats_malloc(sizeof(struct ChannelData));
memset(channel_data[i], 0, sizeof(struct ChannelData));
channel_data[i]->channel_number = i;
free_local_channels.enqueue(channel_data[i]);
} else {
channels[i] = NULL;
channel_data[i] = NULL;
}
} else {
channels[i] = NULL;
channel_data[i] = NULL;
}
}
}
return true; // OK
}
int
ClusterHandler::alloc_channel(ClusterVConnection * vc, int requested)
{
//
// Allocate a channel
//
struct ChannelData *cdp = 0;
int i = requested;
if (!i) {
int loops = 1;
do {
cdp = free_local_channels.dequeue();
if (!cdp) {
if (!check_channel(n_channels)) {
return -2; // Limit exceeded
}
} else {
ink_assert(cdp == channel_data[cdp->channel_number]);
i = cdp->channel_number;
break;
}
} while (loops--);
ink_release_assert(i != 0); // required
ink_release_assert(channels[i] == (ClusterVConnection *) 1); // required
Debug(CL_TRACE, "alloc_channel local chan=%d VC=%p", i, vc);
} else {
if (!check_channel(i)) {
return -2; // Limit exceeded
}
if (channels[i]) {
Debug(CL_TRACE, "alloc_channel remote inuse chan=%d VC=%p", i, vc);
return -1; // channel in use
} else {
Debug(CL_TRACE, "alloc_channel remote chan=%d VC=%p", i, vc);
}
}
channels[i] = vc;
vc->channel = i;
return i;
}
void
ClusterHandler::free_channel(ClusterVConnection * vc)
{
//
// Free a channel
//
int i = vc->channel;
if (i > LAST_DEDICATED_CHANNEL && channels[i] == vc) {
if (local_channel(i)) {
channels[i] = (ClusterVConnection *) 1;
free_local_channels.enqueue(channel_data[i]);
Debug(CL_TRACE, "free_channel local chan=%d VC=%p", i, vc);
} else {
channels[i] = 0;
Debug(CL_TRACE, "free_channel remote chan=%d VC=%p", i, vc);
}
}
vc->channel = 0;
}
int
ClusterHandler::machine_down()
{
char textbuf[sizeof("255.255.255.255:65535")];
if (dead) {
return EVENT_DONE;
}
//
// Looks like this machine dropped out of the cluster.
// Deal with it.
// Fatal read/write errors on the node to node connection along
// with failure of the cluster membership check in the periodic event
// result in machine_down().
//
#ifdef LOCAL_CLUSTER_TEST_MODE
Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), port);
#else
Note("machine down %u.%u.%u.%u:%d", DOT_SEPARATED(ip), id);
#endif
machine_offline_APIcallout(ip);
snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
REC_SignalManager(REC_SIGNAL_MACHINE_DOWN, textbuf);
if (net_vc) {
net_vc->do_io(VIO::CLOSE);
net_vc = 0;
}
// Cancel pending cluster reads and writes
read.io_complete = -1;
write.io_complete = -1;
MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
ClusterConfiguration *c = this_cluster()->current_configuration();
machine->clusterHandlers[id] = NULL;
if ((--machine->now_connections == 0) && c->find(ip, port)) {
ClusterConfiguration *cc = configuration_remove_machine(c, machine);
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_NODES_STAT);
this_cluster()->configurations.push(cc);
machine->dead = true;
}
MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
MachineList *cc = the_cluster_config();
if (cc && cc->find(ip, port) && connector) {
Debug(CL_NOTE, "cluster connect retry for %hhu.%hhu.%hhu.%hhu", DOT_SEPARATED(ip));
clusterProcessor.connect(ip, port, id);
}
return zombify(); // defer deletion of *this
}
int
ClusterHandler::zombify(Event * /* e ATS_UNUSED */)
{
//
// Node associated with *this is declared down, setup the event to cleanup
// and defer deletion of *this
//
dead = true;
if (cluster_periodic_event) {
cluster_periodic_event->cancel(this);
cluster_periodic_event = NULL;
}
clm->cancel_monitor();
SET_HANDLER((ClusterContHandler) & ClusterHandler::protoZombieEvent);
//
// At this point, allow the caller (either process_read/write to complete)
// prior to performing node down actions.
//
eventProcessor.schedule_in(this, HRTIME_SECONDS(1), ET_CLUSTER);
return EVENT_DONE;
}
int
ClusterHandler::connectClusterEvent(int event, Event * e)
{
if ((event == EVENT_IMMEDIATE) || (event == EVENT_INTERVAL)) {
//
// Attempt connect to target node and if successful, setup the event
// to initiate the node to node connection protocol.
// Initiated via ClusterProcessor::connect().
//
MachineList *cc = the_cluster_config();
if (!machine)
machine = NEW(new ClusterMachine(hostname, ip, port));
#ifdef LOCAL_CLUSTER_TEST_MODE
if (!(cc && cc->find(ip, port))) {
#else
if (this_cluster_machine()->ip == machine->ip || !(cc && cc->find(ip, port))) {
#endif
if (this_cluster_machine()->ip != machine->ip)
Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u not in cluster", DOT_SEPARATED(machine->ip));
delete machine;
machine = NULL;
delete this;
return EVENT_DONE;
}
// Connect to cluster member
Debug(CL_NOTE, "connect_re from %u.%u.%u.%u to %u.%u.%u.%u",
DOT_SEPARATED(this_cluster_machine()->ip), DOT_SEPARATED(machine->ip));
ip = machine->ip;
NetVCOptions opt;
opt.socket_send_bufsize = cluster_send_buffer_size;
opt.socket_recv_bufsize = cluster_receive_buffer_size;
opt.sockopt_flags = cluster_sockopt_flags;
opt.packet_mark = cluster_packet_mark;
opt.packet_tos = cluster_packet_tos;
opt.etype = ET_CLUSTER;
opt.addr_binding = NetVCOptions::INTF_ADDR;
opt.local_ip = this_cluster_machine()->ip;
struct sockaddr_in addr;
ats_ip4_set(&addr, machine->ip,
htons(machine->cluster_port ? machine->cluster_port : cluster_port));
// TODO: Should we check the Action* returned here?
netProcessor.connect_re(this, ats_ip_sa_cast(&addr), &opt);
return EVENT_DONE;
} else {
if (event == NET_EVENT_OPEN) {
net_vc = (NetVConnection *) e;
SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent);
eventProcessor.schedule_imm(this, ET_CLUSTER);
return EVENT_DONE;
} else {
eventProcessor.schedule_in(this, CLUSTER_MEMBER_DELAY);
return EVENT_CONT;
}
}
}
int
ClusterHandler::startClusterEvent(int event, Event * e)
{
char textbuf[sizeof("255.255.255.255:65535")];
// Perform the node to node connection establish protocol.
(void) event;
ink_assert(!read_vcs);
ink_assert(!write_vcs);
if (event == EVENT_IMMEDIATE) {
if (cluster_connect_state == ClusterHandler::CLCON_INITIAL) {
cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
} else {
ink_release_assert(!"startClusterEvent, EVENT_IMMEDIATE not expected");
}
} else {
ink_release_assert(event == EVENT_INTERVAL);
}
for (;;) {
switch (cluster_connect_state) {
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_INITIAL:
////////////////////////////////////////////////////////////////////////////
{
ink_release_assert(!"Invalid state [CLCON_INITIAL]");
}
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_SEND_MSG:
////////////////////////////////////////////////////////////////////////////
{
// Send initial message.
#ifdef LOCAL_CLUSTER_TEST_MODE
nodeClusteringVersion._port = cluster_port;
#endif
cluster_connect_state = ClusterHandler::CLCON_SEND_MSG_COMPLETE;
if (connector)
nodeClusteringVersion._id = id;
build_data_vector((char *) &nodeClusteringVersion, sizeof(nodeClusteringVersion), false);
if (!write.doIO()) {
// i/o not initiated, delay and retry
cluster_connect_state = ClusterHandler::CLCON_SEND_MSG;
eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
return EVENT_DONE;
}
break;
}
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_SEND_MSG_COMPLETE:
////////////////////////////////////////////////////////////////////////////
{
if (write.io_complete) {
if ((write.io_complete < 0)
|| ((size_t) write.did < sizeof(nodeClusteringVersion))) {
Debug(CL_NOTE, "unable to write to cluster node %u.%u.%u.%u: %d",
DOT_SEPARATED(ip), write.io_complete_event);
cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
break; // goto next state
}
// Write OK, await message from peer node.
build_data_vector((char *) &clusteringVersion, sizeof(clusteringVersion), true);
cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
break;
} else {
// Delay and check for i/o completion
eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
return EVENT_DONE;
}
}
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_READ_MSG:
////////////////////////////////////////////////////////////////////////////
{
cluster_connect_state = ClusterHandler::CLCON_READ_MSG_COMPLETE;
if (!read.doIO()) {
// i/o not initiated, delay and retry
cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
return EVENT_DONE;
}
break;
}
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_READ_MSG_COMPLETE:
////////////////////////////////////////////////////////////////////////////
{
if (read.io_complete) {
if (read.io_complete < 0) {
// Read error, abort connect
cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
break; // goto next state
}
if ((size_t) read.did < sizeof(clusteringVersion)) {
// Partial read, resume read.
cluster_connect_state = ClusterHandler::CLCON_READ_MSG;
break;
}
cluster_connect_state = ClusterHandler::CLCON_VALIDATE_MSG;
break;
} else {
// Delay and check for i/o completion
eventProcessor.schedule_in(this, CLUSTER_PERIOD, ET_CLUSTER);
return EVENT_DONE;
}
}
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_VALIDATE_MSG:
////////////////////////////////////////////////////////////////////////////
{
int proto_major = -1;
int proto_minor = -1;
clusteringVersion.AdjustByteOrder();
/////////////////////////////////////////////////////////////////////////
// Determine the message protocol major version to use, by stepping down
// from current to the minimium level until a match is found.
// Derive the minor number as follows, if the current (major, minor)
// is the current node (major, minor) use the given minor number.
// Otherwise, minor number is zero.
/////////////////////////////////////////////////////////////////////////
for (int major = clusteringVersion._major; major >= clusteringVersion._min_major; --major) {
if ((major >= nodeClusteringVersion._min_major) && (major <= nodeClusteringVersion._major)) {
proto_major = major;
}
}
if (proto_major > 0) {
///////////////////////////
// Compute minor version
///////////////////////////
if (proto_major == clusteringVersion._major) {
proto_minor = clusteringVersion._minor;
if (proto_minor != nodeClusteringVersion._minor)
Warning("Different clustering minor versions (%d,%d) for node %u.%u.%u.%u, continuing",
proto_minor, nodeClusteringVersion._minor, DOT_SEPARATED(ip));
} else {
proto_minor = 0;
}
} else {
Warning("Bad cluster major version range (%d-%d) for node %u.%u.%u.%u connect failed",
clusteringVersion._min_major, clusteringVersion._major, DOT_SEPARATED(ip));
cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
break; // goto next state
}
#ifdef LOCAL_CLUSTER_TEST_MODE
port = clusteringVersion._port & 0xffff;
#endif
if (!connector)
id = clusteringVersion._id & 0xffff;
machine->msg_proto_major = proto_major;
machine->msg_proto_minor = proto_minor;
if (eventProcessor.n_threads_for_type[ET_CLUSTER] != num_of_cluster_threads) {
cluster_connect_state = ClusterHandler::CLCON_ABORT_CONNECT;
break;
}
thread = eventProcessor.eventthread[ET_CLUSTER][id % num_of_cluster_threads];
if (net_vc->thread == thread) {
cluster_connect_state = CLCON_CONN_BIND_OK;
break;
} else {
cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_CLEAR;
}
}
case ClusterHandler::CLCON_CONN_BIND_CLEAR:
{
UnixNetVConnection *vc = (UnixNetVConnection *)net_vc;
MUTEX_TRY_LOCK(lock, vc->nh->mutex, e->ethread);
MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
if (lock && lock1) {
vc->ep.stop();
vc->nh->open_list.remove(vc);
vc->thread = NULL;
if (vc->nh->read_ready_list.in(vc))
vc->nh->read_ready_list.remove(vc);
if (vc->nh->write_ready_list.in(vc))
vc->nh->write_ready_list.remove(vc);
if (vc->read.in_enabled_list)
vc->nh->read_enable_list.remove(vc);
if (vc->write.in_enabled_list)
vc->nh->write_enable_list.remove(vc);
// CLCON_CONN_BIND handle in bind vc->thread (bind thread nh)
cluster_connect_state = ClusterHandler::CLCON_CONN_BIND;
thread->schedule_in(this, CLUSTER_PERIOD);
return EVENT_DONE;
} else {
// CLCON_CONN_BIND_CLEAR handle in origin vc->thread (origin thread nh)
vc->thread->schedule_in(this, CLUSTER_PERIOD);
return EVENT_DONE;
}
}
case ClusterHandler::CLCON_CONN_BIND:
{
//
NetHandler *nh = get_NetHandler(e->ethread);
UnixNetVConnection *vc = (UnixNetVConnection *)net_vc;
MUTEX_TRY_LOCK(lock, nh->mutex, e->ethread);
MUTEX_TRY_LOCK(lock1, vc->mutex, e->ethread);
if (lock && lock1) {
if (vc->read.in_enabled_list)
nh->read_enable_list.push(vc);
if (vc->write.in_enabled_list)
nh->write_enable_list.push(vc);
vc->nh = nh;
vc->thread = e->ethread;
PollDescriptor *pd = get_PollDescriptor(e->ethread);
if (vc->ep.start(pd, vc, EVENTIO_READ|EVENTIO_WRITE) < 0) {
cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
break; // goto next state
}
nh->open_list.enqueue(vc);
cluster_connect_state = ClusterHandler::CLCON_CONN_BIND_OK;
} else {
thread->schedule_in(this, CLUSTER_PERIOD);
return EVENT_DONE;
}
}
case ClusterHandler::CLCON_CONN_BIND_OK:
{
int failed = 0;
// include this node into the cluster configuration
MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread());
MachineList *cc = the_cluster_config();
if (cc && cc->find(ip, port)) {
ClusterConfiguration *c = this_cluster()->current_configuration();
ClusterMachine *m = c->find(ip, port);
if (!m) { // this first connection
ClusterConfiguration *cconf = configuration_add_machine(c, machine);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NODES_STAT);
this_cluster()->configurations.push(cconf);
} else {
// close new connection if old connections is exist
if (id >= m->num_connections || m->clusterHandlers[id]) {
failed = -2;
MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
goto failed;
}
machine = m;
}
machine->now_connections++;
machine->clusterHandlers[id] = this;
machine->dead = false;
dead = false;
} else {
Debug(CL_NOTE, "cluster connect aborted, machine %u.%u.%u.%u:%d not in cluster", DOT_SEPARATED(ip), port);
failed = -1;
}
MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread());
failed:
if (failed) {
if (failed == -1) {
if (++configLookupFails <= CONFIG_LOOKUP_RETRIES) {
thread->schedule_in(this, CLUSTER_PERIOD);
return EVENT_DONE;
}
}
cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
break; // goto next state
}
this->needByteSwap = !clusteringVersion.NativeByteOrder();
machine_online_APIcallout(ip);
// Signal the manager
snprintf(textbuf, sizeof(textbuf), "%hhu.%hhu.%hhu.%hhu:%d", DOT_SEPARATED(ip), port);
REC_SignalManager(REC_SIGNAL_MACHINE_UP, textbuf);
#ifdef LOCAL_CLUSTER_TEST_MODE
Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
DOT_SEPARATED(ip), port, clusteringVersion._major, clusteringVersion._minor);
#else
Note("machine up %hhu.%hhu.%hhu.%hhu:%d, protocol version=%d.%d",
DOT_SEPARATED(ip), id, clusteringVersion._major, clusteringVersion._minor);
#endif
read_vcs = NEW((new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_read_link>[CLUSTER_BUCKETS]));
write_vcs = NEW((new Queue<ClusterVConnectionBase, ClusterVConnectionBase::Link_write_link>[CLUSTER_BUCKETS]));
SET_HANDLER((ClusterContHandler) & ClusterHandler::beginClusterEvent);
// enable schedule_imm() on i/o completion (optimization)
read.do_iodone_event = true;
write.do_iodone_event = true;
cluster_periodic_event = thread->schedule_every(this, -CLUSTER_PERIOD);
// Startup the periodic events to process entries in
// external_incoming_control.
int procs_online = ink_number_of_processors();
int total_callbacks = min(procs_online, MAX_COMPLETION_CALLBACK_EVENTS);
for (int n = 0; n < total_callbacks; ++n) {
callout_cont[n] = NEW(new ClusterCalloutContinuation(this));
callout_events[n] = eventProcessor.schedule_every(callout_cont[n], COMPLETION_CALLBACK_PERIOD, ET_NET);
}
// Start cluster interconnect load monitoring
if (!clm) {
clm = new ClusterLoadMonitor(this);
clm->init();
}
return EVENT_DONE;
}
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_ABORT_CONNECT:
////////////////////////////////////////////////////////////////////////////
{
if (connector) {
Debug(CL_NOTE, "cluster connect retry for %u.%u.%u.%u", DOT_SEPARATED(ip));
// check for duplicate cluster connect
clusterProcessor.connect(ip, port, id, true);
}
cluster_connect_state = ClusterHandler::CLCON_DELETE_CONNECT;
break; // goto next state
}
////////////////////////////////////////////////////////////////////////////
case ClusterHandler::CLCON_DELETE_CONNECT:
////////////////////////////////////////////////////////////////////////////
{
// No references possible, so just delete it.
delete machine;
machine = NULL;
delete this;
Debug(CL_NOTE, "Failed cluster connect, deleting");
return EVENT_DONE;
}
////////////////////////////////////////////////////////////////////////////
default:
////////////////////////////////////////////////////////////////////////////
{
Warning("startClusterEvent invalid state %d", cluster_connect_state);
ink_release_assert(!"ClusterHandler::startClusterEvent invalid state");
return EVENT_DONE;
}
} // End of switch
} // End of for
return EVENT_DONE;
}
int
ClusterHandler::beginClusterEvent(int /* event ATS_UNUSED */, Event * e)
{
// Establish the main periodic Cluster event
#ifdef CLUSTER_IMMEDIATE_NETIO
build_poll(false);
#endif
SET_HANDLER((ClusterContHandler) & ClusterHandler::mainClusterEvent);
return handleEvent(EVENT_INTERVAL, e);
}
int
ClusterHandler::zombieClusterEvent(int event, Event * e)
{
//
// The ZOMBIE state is entered when the handler may still be referenced
// by short running tasks (one scheduling quanta). The object is delayed
// after some unreasonably long (in comparison) time.
//
(void) event;
(void) e;
delete this; // I am out of here
return EVENT_DONE;
}
int
ClusterHandler::protoZombieEvent(int /* event ATS_UNUSED */, Event * e)
{
//
// Node associated with *this is declared down.
// After cleanup is complete, setup handler to delete *this
// after NO_RACE_DELAY
//
bool failed = false;
ink_hrtime delay = CLUSTER_MEMBER_DELAY * 5;
EThread *t = e ? e->ethread : this_ethread();
head_p item;
/////////////////////////////////////////////////////////////////
// Complete pending i/o operations
/////////////////////////////////////////////////////////////////
mainClusterEvent(EVENT_INTERVAL, e);
item.data = external_incoming_open_local.head.data;
if (TO_PTR(FREELIST_POINTER(item)) ||
delayed_reads.head || pw_write_descriptors_built
|| pw_freespace_descriptors_built || pw_controldata_descriptors_built) {
// Operations still pending, retry later
if (e) {
e->schedule_in(delay);
return EVENT_CONT;
} else {
eventProcessor.schedule_in(this, delay, ET_CLUSTER);
return EVENT_DONE;
}
}
///////////////////////////////////////////////////////////////
// Deallocate current read control data
///////////////////////////////////////////////////////////////
IncomingControl *ic;
while ((ic = incoming_control.dequeue())) {
failed = true;
ic->mutex = NULL;
ic->freeall();
}
/////////////////////////////////////////////////////////////////
// Post error completion on all active read/write VC(s) and
// deallocate closed VC(s).
/////////////////////////////////////////////////////////////////
for (int i = 0; i < n_channels; i++) {
ClusterVConnection *vc = channels[i];
if (VALID_CHANNEL(vc)) {
if (!vc->closed && vc->read.vio.op == VIO::READ) {
MUTEX_TRY_LOCK(lock, vc->read.vio.mutex, t);
if (lock) {
cluster_signal_error_and_update(vc, &vc->read, 0);
} else {
failed = true;
}
}
vc = channels[i];
if (VALID_CHANNEL(vc)
&& !vc->closed && vc->write.vio.op == VIO::WRITE) {
MUTEX_TRY_LOCK(lock, vc->write.vio.mutex, t);
if (lock) {
cluster_signal_error_and_update(vc, &vc->write, 0);
} else {
failed = true;
}
}
vc = channels[i];
if (VALID_CHANNEL(vc)) {
if (vc->closed) {
vc->ch = 0;
vc->write_list = 0;
vc->write_list_tail = 0;
vc->write_list_bytes = 0;
vc->write_bytes_in_transit = 0;
close_ClusterVConnection(vc);
} else {
failed = true;
}
}
}
}
///////////////////////////////////////////////////////////////
// Empty the external_incoming_control queue before aborting
// the completion callbacks.
///////////////////////////////////////////////////////////////
item.data = external_incoming_control.head.data;
if (TO_PTR(FREELIST_POINTER(item)) == NULL) {
for (int n = 0; n < MAX_COMPLETION_CALLBACK_EVENTS; ++n) {
if (callout_cont[n]) {
MUTEX_TRY_LOCK(lock, callout_cont[n]->mutex, t);
if (lock) {
callout_events[n]->cancel(callout_cont[n]);
callout_events[n] = 0;
delete callout_cont[n];
callout_cont[n] = 0;
} else {
failed = true;
}
}
}
} else {
failed = true;
}
if (!failed) {
Debug("cluster_down", "ClusterHandler zombie [%u.%u.%u.%u]", DOT_SEPARATED(ip));
SET_HANDLER((ClusterContHandler) & ClusterHandler::zombieClusterEvent);
delay = NO_RACE_DELAY;
}
if (e) {
e->schedule_in(delay);
return EVENT_CONT;
} else {
eventProcessor.schedule_in(this, delay, ET_CLUSTER);
return EVENT_DONE;
}
}
int dump_verbose = 0;
int
ClusterHandler::compute_active_channels()
{
ClusterHandler *ch = this;
int active_chans = 0;
for (int i = LAST_DEDICATED_CHANNEL + 1; i < ch->n_channels; i++) {
ClusterVConnection *vc = ch->channels[i];
if (VALID_CHANNEL(vc) && (vc->iov_map != CLUSTER_IOV_NOT_OPEN)) {
++active_chans;
if (dump_verbose) {
printf("ch[%d] vc=0x%p remote_free=%d last_local_free=%d\n", i, vc,
vc->remote_free, vc->last_local_free);
printf(" r_bytes=%d r_done=%d w_bytes=%d w_done=%d\n",
(int)vc->read.vio.nbytes, (int)vc->read.vio.ndone,
(int)vc->write.vio.nbytes, (int)vc->write.vio.ndone);
}
}
}
return active_chans;
}
void
ClusterHandler::dump_internal_data()
{
if (!message_blk) {
message_blk = new_IOBufferBlock();
message_blk->alloc(MAX_IOBUFFER_SIZE);
}
int r;
int n = 0;
char *b = message_blk->data->data();
unsigned int b_size = message_blk->data->block_size();
r = snprintf(&b[n], b_size - n, "Host: %hhu.%hhu.%hhu.%hhu\n", DOT_SEPARATED(ip));
n += r;
r = snprintf(&b[n], b_size - n,
"chans: %d vc_writes: %" PRId64 " write_bytes: %" PRId64 "(d)+%" PRId64 "(c)=%" PRId64 "\n",
compute_active_channels(),
_vc_writes, _vc_write_bytes, _control_write_bytes, _vc_write_bytes + _control_write_bytes);
n += r;
r = snprintf(&b[n], b_size - n,
"dw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
_dw_missed_lock, _dw_not_enabled, _dw_wait_remote_fill, _dw_no_active_vio);
n += r;
r = snprintf(&b[n], b_size - n,
"dw: not_enabled_or_no_write: %d set_data_pending: %d no_free_space: %d\n",
_dw_not_enabled_or_no_write, _dw_set_data_pending, _dw_no_free_space);
n += r;
r = snprintf(&b[n], b_size - n,
"fw: missed_lock: %d not_enabled: %d wait_remote_fill: %d no_active_vio: %d\n",
_fw_missed_lock, _fw_not_enabled, _fw_wait_remote_fill, _fw_no_active_vio);
n += r;
r = snprintf(&b[n], b_size - n, "fw: not_enabled_or_no_read: %d\n", _fw_not_enabled_or_no_read);
n += r;
r = snprintf(&b[n], b_size - n,
"rd(%d): st:%d rh:%d ahd:%d sd:%d rd:%d ad:%d sda:%d rda:%d awd:%d p:%d c:%d\n",
_process_read_calls, _n_read_start, _n_read_header, _n_read_await_header,
_n_read_setup_descriptor, _n_read_descriptor, _n_read_await_descriptor,
_n_read_setup_data, _n_read_data, _n_read_await_data, _n_read_post_complete, _n_read_complete);
n += r;
r = snprintf(&b[n], b_size - n,
"wr(%d): st:%d set:%d ini:%d wait:%d post:%d comp:%d\n",
_process_write_calls, _n_write_start, _n_write_setup, _n_write_initiate,
_n_write_await_completion, _n_write_post_complete, _n_write_complete);
n += r;
ink_release_assert((n + 1) <= BUFFER_SIZE_FOR_INDEX(MAX_IOBUFFER_SIZE));
Note("%s", b);
clear_cluster_stats();
}
void
ClusterHandler::dump_write_msg(int res)
{
// Debug support for inter cluster message trace
Alias32 x;
x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
fprintf(stderr,
"[W] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d Todo=%d, Res=%d\n",
x.byte[0], x.byte[1], x.byte[2], x.byte[3], write.sequence_number, write.msg.count, write.msg.control_bytes, write.to_do, res);
for (int i = 0; i < write.msg.count; ++i) {
fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
i, (write.msg.descriptor[i].type ? 1 : 0),
(int) write.msg.descriptor[i].channel,
(int) write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length);
}
}
void
ClusterHandler::dump_read_msg()
{
// Debug support for inter cluster message trace
Alias32 x;
x.u32 = (uint32_t) ((struct sockaddr_in *)(net_vc->get_remote_addr()))->sin_addr.s_addr;
fprintf(stderr, "[R] %hhu.%hhu.%hhu.%hhu SeqNo=%u, Cnt=%d, CntlCnt=%d\n",
x.byte[0], x.byte[1], x.byte[2], x.byte[3], read.sequence_number, read.msg.count, read.msg.control_bytes);
for (int i = 0; i < read.msg.count; ++i) {
fprintf(stderr, " d[%i] Type=%d, Chan=%d, SeqNo=%d, Len=%u\n",
i, (read.msg.descriptor[i].type ? 1 : 0),
(int) read.msg.descriptor[i].channel,
(int) read.msg.descriptor[i].sequence_number, read.msg.descriptor[i].length);
}
}
// End of ClusterHandlerBase.cc