| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| |
| /**************************************************************************** |
| |
| ClusterHandler.cc |
| ****************************************************************************/ |
| |
| #define DEFINE_CLUSTER_FUNCTIONS |
| #include "P_Cluster.h" |
| |
| /*************************************************************************/ |
| // Global Data |
| /*************************************************************************/ |
| // Initialize clusterFunction[] size |
| unsigned SIZE_clusterFunction = countof(clusterFunction); |
| |
| // hook for testing |
| ClusterFunction *ptest_ClusterFunction = NULL; |
| |
| // global bit buckets for closed channels |
| static char channel_dummy_input[DEFAULT_MAX_BUFFER_SIZE]; |
| char channel_dummy_output[DEFAULT_MAX_BUFFER_SIZE]; |
| |
| // outgoing control continuations |
| ClassAllocator<OutgoingControl> outControlAllocator("outControlAllocator"); |
| |
| // incoming control descriptors |
| ClassAllocator<IncomingControl> inControlAllocator("inControlAllocator"); |
| |
| static int dump_msgs = 0; |
| |
| ///////////////////////////////////////// |
| // VERIFY_PETERS_DATA support code |
| ///////////////////////////////////////// |
| #ifdef VERIFY_PETERS_DATA |
| #define DO_VERIFY_PETERS_DATA(_p,_l) verify_peters_data(_p,_l) |
| #else |
| #define DO_VERIFY_PETERS_DATA(_p,_l) |
| #endif |
| |
| void |
| verify_peters_data(char *ap, int l) |
| { |
| unsigned char *p = (unsigned char *) ap; |
| for (int i = 0; i < l - 1; i++) { |
| unsigned char x1 = p[i]; |
| unsigned char x2 = p[i + 1]; |
| x1 += 1; |
| if (x1 != x2) { |
| fprintf(stderr, "verify peter's data failed at %d\n", i); |
| break; |
| } |
| } |
| } |
| |
| /*************************************************************************/ |
| // ClusterHandler member functions (Internal Class) |
| /*************************************************************************/ |
| // |
| // Overview: |
| // In a steady state cluster environment, all cluster nodes have an |
| // established TCP socket connection to each node in the cluster. |
| // An instance of the class ClusterHandler exists for each known node |
| // in the cluster. All specific node-node data/state is encapsulated |
| // by this class. |
| // |
| // ClusterHandler::mainClusterEvent() is the key periodic event which |
| // drives the read/write action over the node-node socket connection. |
| // A high level overview of ClusterHandler::mainClusterEvent() action is |
| // as follows: |
| // 1) Perform cluster interconnect load monitoring functions. |
| // If interconnect is overloaded, convert all remote cluster |
| // operations to proxy only. |
| // 2) Process delayed reads. Delayed read refers to data associated |
| // with a VC (Virtual Connection) which resides in an intermediate |
| // buffer and is unknown to the VC. This is required in cases |
| // where we are unable to acquire the VC mutex at the time of the |
| // read from the node-node socket. Delayed read processing |
| // consists of acquiring the VC mutex and moving the data into the |
| // VC and posting read completion. |
| // 3) Process pending read data on the node-node TCP socket. In the |
| // typical case, read processing is performed using three read |
| // operations. The actions are as follows: |
| // a) read the fixed size message header |
| // (struct ClusterMsgHeader) consisting of the |
| // number of data descriptors and the size of the inline |
| // control messages following the data descriptors. |
| // b) Setup buffer for data descriptors and inline control |
| // messages and issue read. |
| // c) Setup read buffers and acquire applicable locks for |
| // VC/Control data described by data descriptors and issue |
| // read. |
| // d) Perform read completion actions on control and VC data. |
| // e) Free VC locks |
| // 4) Process write bank data. Write bank data is outstanding data |
| // which we were unable to push out in the last write over the |
| // node-node TCP socket. Write bank data must be successfully pushed |
| // before performing any additional write processing. |
| // 5) Build a write message consisting of the following data: |
| // 1) Write data for a Virtual Connection in the current write data |
| // bucket (write_vcs) |
| // 2) Virtual Connection free space for VCs in the current read |
| // data bucket (read_vcs) |
| // 3) Control message data (outgoing_control) |
| // 6) Push write data |
| // |
| // Thread stealing refers to executing the control message processing |
| // portion of mainClusterEvent() by a thread not associated with the |
| // periodic event. This a mechanism to avoid the latency on control |
| // messages by allowing them to be pushed immediately. |
| // |
| /*************************************************************************/ |
| |
| ClusterHandler::ClusterHandler() |
| : net_vc(0), |
| thread(0), |
| ip(0), |
| port(0), |
| hostname(NULL), |
| machine(NULL), |
| ifd(-1), |
| id(-1), |
| dead(true), |
| downing(false), |
| active(false), |
| on_stolen_thread(false), |
| n_channels(0), |
| channels(NULL), |
| channel_data(NULL), |
| connector(false), |
| cluster_connect_state(ClusterHandler::CLCON_INITIAL), |
| needByteSwap(false), |
| configLookupFails(0), |
| cluster_periodic_event(0), |
| read(this, true), |
| write(this, false), |
| current_time(0), |
| last(0), |
| last_report(0), |
| n_since_last_report(0), |
| last_cluster_op_enable(0), |
| last_trace_dump(0), |
| clm(0), |
| disable_remote_cluster_ops(0), |
| pw_write_descriptors_built(0), |
| pw_freespace_descriptors_built(0), |
| pw_controldata_descriptors_built(0), pw_time_expired(0), started_on_stolen_thread(false), control_message_write(false) |
| #ifdef CLUSTER_STATS |
| , |
| _vc_writes(0), |
| _vc_write_bytes(0), |
| _control_write_bytes(0), |
| _dw_missed_lock(0), |
| _dw_not_enabled(0), |
| _dw_wait_remote_fill(0), |
| _dw_no_active_vio(0), |
| _dw_not_enabled_or_no_write(0), |
| _dw_set_data_pending(0), |
| _dw_no_free_space(0), |
| _fw_missed_lock(0), |
| _fw_not_enabled(0), |
| _fw_wait_remote_fill(0), |
| _fw_no_active_vio(0), |
| _fw_not_enabled_or_no_read(0), |
| _process_read_calls(0), |
| _n_read_start(0), |
| _n_read_header(0), |
| _n_read_await_header(0), |
| _n_read_setup_descriptor(0), |
| _n_read_descriptor(0), |
| _n_read_await_descriptor(0), |
| _n_read_setup_data(0), |
| _n_read_data(0), |
| _n_read_await_data(0), |
| _n_read_post_complete(0), |
| _n_read_complete(0), |
| _process_write_calls(0), |
| _n_write_start(0), |
| _n_write_setup(0), _n_write_initiate(0), _n_write_await_completion(0), _n_write_post_complete(0), _n_write_complete(0) |
| #endif |
| { |
| #ifdef MSG_TRACE |
| t_fd = fopen("msgtrace.log", "w"); |
| #endif |
| // we need to lead by at least 1 |
| |
| min_priority = 1; |
| SET_HANDLER((ClusterContHandler) & ClusterHandler::startClusterEvent); |
| |
| mutex = new_ProxyMutex(); |
| OutgoingControl oc; |
| int n; |
| for (n = 0; n < CLUSTER_CMSG_QUEUES; ++n) { |
| ink_atomiclist_init(&outgoing_control_al[n], "OutGoingControlQueue", (char *) &oc.link.next - (char *) &oc); |
| } |
| |
| IncomingControl ic; |
| ink_atomiclist_init(&external_incoming_control, |
| "ExternalIncomingControlQueue", (char *) &ic.link.next - (char *) &ic); |
| |
| ClusterVConnection ivc; |
| ink_atomiclist_init(&external_incoming_open_local, |
| "ExternalIncomingOpenLocalQueue", (char *) &ivc.link.next - (char *) &ivc); |
| ink_atomiclist_init(&read_vcs_ready, "ReadVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next)); |
| ink_atomiclist_init(&write_vcs_ready, "WriteVcReadyQueue", offsetof(ClusterVConnection, ready_alink.next)); |
| memset((char *) &callout_cont[0], 0, sizeof(callout_cont)); |
| memset((char *) &callout_events[0], 0, sizeof(callout_events)); |
| } |
| |
| ClusterHandler::~ClusterHandler() |
| { |
| bool free_m = false; |
| if (net_vc) { |
| net_vc->do_io(VIO::CLOSE); |
| net_vc = 0; |
| } |
| if (machine) { |
| MUTEX_TAKE_LOCK(the_cluster_config_mutex, this_ethread()); |
| if (++machine->free_connections >= machine->num_connections) |
| free_m = true; |
| MUTEX_UNTAKE_LOCK(the_cluster_config_mutex, this_ethread()); |
| if (free_m) |
| free_ClusterMachine(machine); |
| } |
| machine = NULL; |
| ats_free(hostname); |
| hostname = NULL; |
| ats_free(channels); |
| channels = NULL; |
| if (channel_data) { |
| for (int i = 0; i < n_channels; ++i) { |
| if (channel_data[i]) { |
| ats_free(channel_data[i]); |
| channel_data[i] = 0; |
| } |
| } |
| ats_free(channel_data); |
| channel_data = NULL; |
| } |
| if (read_vcs) |
| delete[]read_vcs; |
| read_vcs = NULL; |
| |
| if (write_vcs) |
| delete[]write_vcs; |
| write_vcs = NULL; |
| |
| if (clm) { |
| delete clm; |
| clm = NULL; |
| } |
| #ifdef CLUSTER_STATS |
| message_blk = 0; |
| #endif |
| } |
| |
| void |
| ClusterHandler::close_ClusterVConnection(ClusterVConnection * vc) |
| { |
| // |
| // Close down a ClusterVConnection |
| // |
| if (vc->inactivity_timeout) |
| vc->inactivity_timeout->cancel(vc); |
| if (vc->active_timeout) |
| vc->active_timeout->cancel(vc); |
| if (vc->read.queue) |
| ClusterVC_remove_read(vc); |
| if (vc->write.queue) |
| ClusterVC_remove_write(vc); |
| vc->read.vio.mutex = NULL; |
| vc->write.vio.mutex = NULL; |
| |
| ink_assert(!vc->read_locked); |
| ink_assert(!vc->write_locked); |
| int channel = vc->channel; |
| free_channel(vc); |
| |
| if (vc->byte_bank_q.head) { |
| delayed_reads.remove(vc); |
| |
| // Deallocate byte bank descriptors |
| ByteBankDescriptor *d; |
| while ((d = vc->byte_bank_q.dequeue())) { |
| ByteBankDescriptor::ByteBankDescriptor_free(d); |
| } |
| } |
| vc->read_block = 0; |
| |
| ink_assert(!vc->write_list); |
| ink_assert(!vc->write_list_tail); |
| ink_assert(!vc->write_list_bytes); |
| ink_assert(!vc->write_bytes_in_transit); |
| |
| if (((!vc->remote_closed && !vc->have_all_data) || (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL)) && vc->ch) { |
| |
| CloseMessage msg; |
| int vers = CloseMessage::protoToVersion(vc->ch->machine->msg_proto_major); |
| void *data; |
| int len; |
| |
| if (vers == CloseMessage::CLOSE_CHAN_MESSAGE_VERSION) { |
| msg.channel = channel; |
| msg.status = (vc->remote_closed == FORCE_CLOSE_ON_OPEN_CHANNEL) ? FORCE_CLOSE_ON_OPEN_CHANNEL : vc->closed; |
| msg.lerrno = vc->lerrno; |
| msg.sequence_number = vc->token.sequence_number; |
| data = (void *) &msg; |
| len = sizeof(CloseMessage); |
| |
| } else { |
| ////////////////////////////////////////////////////////////// |
| // Create the specified down rev version of this message |
| ////////////////////////////////////////////////////////////// |
| ink_release_assert(!"close_ClusterVConnection() bad msg version"); |
| } |
| clusterProcessor.invoke_remote(vc->ch, CLOSE_CHANNEL_CLUSTER_FUNCTION, data, len); |
| } |
| ink_hrtime now = ink_get_hrtime(); |
| CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CONNECTIONS_OPEN_STAT); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_CON_TOTAL_TIME_STAT, now - vc->start_time); |
| if (!local_channel(channel)) { |
| CLUSTER_SUM_DYN_STAT(CLUSTER_REMOTE_CONNECTION_TIME_STAT, now - vc->start_time); |
| } else { |
| CLUSTER_SUM_DYN_STAT(CLUSTER_LOCAL_CONNECTION_TIME_STAT, now - vc->start_time); |
| } |
| clusterVCAllocator_free(vc); |
| } |
| |
| inline bool |
| ClusterHandler::vc_ok_write(ClusterVConnection * vc) |
| { |
| return (((vc->closed > 0) |
| && (vc->write_list || vc->write_bytes_in_transit)) || |
| (!vc->closed && vc->write.enabled && vc->write.vio.op == VIO::WRITE && vc->write.vio.buffer.writer())); |
| } |
| |
| inline bool |
| ClusterHandler::vc_ok_read(ClusterVConnection * vc) |
| { |
| return (!vc->closed && vc->read.vio.op == VIO::READ && vc->read.vio.buffer.writer()); |
| } |
| |
| void |
| ClusterHandler::close_free_lock(ClusterVConnection * vc, ClusterVConnState * s) |
| { |
| Ptr<ProxyMutex> m(s->vio.mutex); |
| if (s == &vc->read) { |
| if ((ProxyMutex *) vc->read_locked) |
| MUTEX_UNTAKE_LOCK(vc->read_locked, thread); |
| vc->read_locked = NULL; |
| } else { |
| if ((ProxyMutex *) vc->write_locked) |
| MUTEX_UNTAKE_LOCK(vc->write_locked, thread); |
| vc->write_locked = NULL; |
| } |
| close_ClusterVConnection(vc); |
| } |
| |
| bool ClusterHandler::build_data_vector(char *d, int len, bool read_flag) |
| { |
| // Internal interface to general network i/o facility allowing |
| // single vector read/write to static data buffer. |
| |
| ClusterState & s = (read_flag ? read : write); |
| ink_assert(d); |
| ink_assert(len); |
| ink_assert(s.iov); |
| |
| s.msg.count = 1; |
| s.iov[0].iov_base = 0; |
| s.iov[0].iov_len = len; |
| s.block[0] = new_IOBufferBlock(); |
| s.block[0]->set(new_constant_IOBufferData(d, len)); |
| |
| if (read_flag) { |
| // Make block write_avail == len |
| s.block[0]->_buf_end = s.block[0]->end() + len; |
| } else { |
| // Make block read_avail == len |
| s.block[0]->fill(len); |
| } |
| |
| s.to_do = len; |
| s.did = 0; |
| s.n_iov = 1; |
| |
| return true; |
| } |
| |
| bool ClusterHandler::build_initial_vector(bool read_flag) |
| { |
| // |
| // Build initial read/write struct iovec and corresponding IOBufferData |
| // structures from the given struct descriptor(s). |
| // Required vector adjustments for partial i/o conditions is handled |
| // by adjust_vector(). |
| // |
| /////////////////////////////////////////////////////////////////// |
| // Descriptor to struct iovec layout |
| /////////////////////////////////////////////////////////////////// |
| // Write iovec[] layout |
| // iov[0] ----> struct ClusterMsgHeader |
| // iov[1] ----> struct descriptor [count] |
| // char short_control_messages[control_bytes] |
| // |
| // iov[2] ----> struct descriptor data (element #1) |
| // ...... |
| // iov[2+count] ----> struct descriptor data (element #count) |
| // |
| /////////////////////////////////////////////////////////////////// |
| // Read iovec[] layout phase #1 read |
| // iov[0] ----> struct ClusterMsgHeader |
| /////////////////////////////////////////////////////////////////// |
| // Read iovec[] layout phase #2 read |
| // iov[0] ----> struct descriptor[count] |
| // char short_control_messages[control_bytes] |
| /////////////////////////////////////////////////////////////////// |
| // Read iovec[] layout phase #3 read |
| // iov[0] ----> struct descriptor data (element #1) |
| // ...... |
| // iov[count-1] ----> struct descriptor data (element #count) |
| /////////////////////////////////////////////////////////////////// |
| int i, n; |
| // This isn't used. |
| // MIOBuffer *w; |
| |
| ink_hrtime now = ink_get_hrtime(); |
| ClusterState & s = (read_flag ? read : write); |
| OutgoingControl *oc = s.msg.outgoing_control.head; |
| IncomingControl *ic = incoming_control.head; |
| int new_n_iov = 0; |
| int to_do = 0; |
| int len; |
| |
| ink_assert(s.iov); |
| |
| if (!read_flag) { |
| ////////////////////////////////////////////////////////////////////// |
| // Setup vector for write of header, descriptors and control data |
| ////////////////////////////////////////////////////////////////////// |
| len = sizeof(ClusterMsgHeader) + (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes; |
| s.iov[new_n_iov].iov_base = 0; |
| s.iov[new_n_iov].iov_len = len; |
| s.block[new_n_iov] = s.msg.get_block_header(); |
| |
| // Make read_avail == len |
| s.block[new_n_iov]->fill(len); |
| |
| to_do += len; |
| ++new_n_iov; |
| |
| } else { |
| if (s.msg.state == 0) { |
| //////////////////////////////////// |
| // Setup vector for read of header |
| //////////////////////////////////// |
| len = sizeof(ClusterMsgHeader); |
| s.iov[new_n_iov].iov_base = 0; |
| s.iov[new_n_iov].iov_len = len; |
| s.block[new_n_iov] = s.msg.get_block_header(); |
| |
| // Make write_avail == len |
| s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len; |
| |
| to_do += len; |
| ++new_n_iov; |
| |
| } else if (s.msg.state == 1) { |
| ///////////////////////////////////////////////////////// |
| // Setup vector for read of Descriptors+control data |
| ///////////////////////////////////////////////////////// |
| len = (s.msg.count * sizeof(Descriptor)) + s.msg.control_bytes; |
| s.iov[new_n_iov].iov_base = 0; |
| s.iov[new_n_iov].iov_len = len; |
| s.block[new_n_iov] = s.msg.get_block_descriptor(); |
| |
| // Make write_avail == len |
| s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + len; |
| |
| to_do += s.iov[new_n_iov].iov_len; |
| ++new_n_iov; |
| } |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // Build vector for data section of the cluster message. |
| // For read, we only do this if we are in data phase |
| // of the read (msg.state == 2) |
| ////////////////////////////////////////////////////////////// |
| // Note: We are assuming that free space descriptors follow |
| // the data descriptors. |
| ////////////////////////////////////////////////////////////// |
| for (i = 0; i<(read_flag ? ((s.msg.state>= 2) ? s.msg.count : 0) |
| : s.msg.count); i++) { |
| if (s.msg.descriptor[i].type == CLUSTER_SEND_DATA) { |
| /////////////////////////////////// |
| // Control channel data |
| /////////////////////////////////// |
| if (s.msg.descriptor[i].channel == CLUSTER_CONTROL_CHANNEL) { |
| if (read_flag) { |
| /////////////////////// |
| // Incoming Control |
| /////////////////////// |
| if (!ic) { |
| ic = IncomingControl::alloc(); |
| ic->recognized_time = now; |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CTRL_MSGS_RECVD_STAT); |
| ic->len = s.msg.descriptor[i].length; |
| ic->alloc_data(); |
| if (!ic->fast_data()) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_SLOW_CTRL_MSGS_RECVD_STAT); |
| } |
| // Mark message data as invalid |
| *((uint32_t *) ic->data) = UNDEFINED_CLUSTER_FUNCTION; |
| incoming_control.enqueue(ic); |
| } |
| s.iov[new_n_iov].iov_base = 0; |
| s.iov[new_n_iov].iov_len = ic->len; |
| s.block[new_n_iov] = ic->get_block(); |
| to_do += s.iov[new_n_iov].iov_len; |
| ++new_n_iov; |
| ic = (IncomingControl *) ic->link.next; |
| } else { |
| /////////////////////// |
| // Outgoing Control |
| /////////////////////// |
| ink_assert(oc); |
| s.iov[new_n_iov].iov_base = 0; |
| s.iov[new_n_iov].iov_len = oc->len; |
| s.block[new_n_iov] = oc->get_block(); |
| to_do += s.iov[new_n_iov].iov_len; |
| ++new_n_iov; |
| oc = (OutgoingControl *) oc->link.next; |
| } |
| } else { |
| /////////////////////////////// |
| // User channel data |
| /////////////////////////////// |
| ClusterVConnection * |
| vc = channels[s.msg.descriptor[i].channel]; |
| |
| if (VALID_CHANNEL(vc) && |
| (s.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { |
| if (read_flag) { |
| ink_release_assert(!vc->initial_data_bytes); |
| ///////////////////////////////////// |
| // Try to get the read VIO mutex |
| ///////////////////////////////////// |
| ink_release_assert(!(ProxyMutex *) vc->read_locked); |
| #ifdef CLUSTER_TOMCAT |
| if (!vc->read.vio.mutex || |
| !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) |
| #else |
| if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) |
| #endif |
| { |
| vc->read_locked = 0; |
| } else { |
| vc->read_locked = vc->read.vio.mutex; |
| } |
| |
| /////////////////////////////////////// |
| // Allocate read data block |
| /////////////////////////////////////// |
| if (s.msg.descriptor[i].length) { |
| vc->iov_map = new_n_iov; |
| } else { |
| vc->iov_map = CLUSTER_IOV_NONE; |
| } |
| if (vc->pending_remote_fill || vc_ok_read(vc)) { |
| ////////////////////////////////////////////////////////// |
| // Initial and subsequent data on open read channel. |
| // Allocate IOBufferBlock. |
| ////////////////////////////////////////////////////////// |
| ink_release_assert(s.msg.descriptor[i].length <= DEFAULT_MAX_BUFFER_SIZE); |
| vc->read_block = new_IOBufferBlock(); |
| int64_t index = buffer_size_to_index(s.msg.descriptor[i].length, MAX_BUFFER_SIZE_INDEX); |
| vc->read_block->alloc(index); |
| |
| s.iov[new_n_iov].iov_base = 0; |
| s.block[new_n_iov] = vc->read_block->clone(); |
| |
| } else { |
| Debug(CL_NOTE, "dumping cluster read data"); |
| s.iov[new_n_iov].iov_base = 0; |
| s.block[new_n_iov] = new_IOBufferBlock(); |
| s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE)); |
| } |
| |
| // Make block write_avail == descriptor[].length |
| s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length; |
| |
| } else { |
| bool remote_write_fill = (vc->pending_remote_fill && vc->remote_write_block); |
| // Sanity check, assert we have the lock |
| if (!remote_write_fill) { |
| ink_assert((ProxyMutex *) vc->write_locked); |
| } |
| if (vc_ok_write(vc) || remote_write_fill) { |
| if (remote_write_fill) { |
| s.iov[new_n_iov].iov_base = 0; |
| ink_release_assert((int) s.msg.descriptor[i].length == bytes_IOBufferBlockList(vc->remote_write_block, 1)); |
| s.block[new_n_iov] = vc->remote_write_block; |
| |
| } else { |
| s.iov[new_n_iov].iov_base = 0; |
| ink_release_assert((int) s.msg.descriptor[i].length <= vc->write_list_bytes); |
| s.block[new_n_iov] = vc->write_list; |
| vc->write_list = consume_IOBufferBlockList(vc->write_list, (int) s.msg.descriptor[i].length); |
| vc->write_list_bytes -= (int) s.msg.descriptor[i].length; |
| vc->write_bytes_in_transit += (int) s.msg.descriptor[i].length; |
| |
| vc->write_list_tail = vc->write_list; |
| while (vc->write_list_tail && vc->write_list_tail->next) |
| vc->write_list_tail = vc->write_list_tail->next; |
| } |
| } else { |
| Debug(CL_NOTE, "faking cluster write data"); |
| s.iov[new_n_iov].iov_base = 0; |
| s.block[new_n_iov] = new_IOBufferBlock(); |
| s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE)); |
| // Make block read_avail == descriptor[].length |
| s.block[new_n_iov]->fill(s.msg.descriptor[i].length); |
| } |
| } |
| } else { |
| // VC has been deleted, need to dump the bits... |
| s.iov[new_n_iov].iov_base = 0; |
| s.block[new_n_iov] = new_IOBufferBlock(); |
| |
| if (read_flag) { |
| s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_input, DEFAULT_MAX_BUFFER_SIZE)); |
| |
| // Make block write_avail == descriptor[].length |
| s.block[new_n_iov]->_buf_end = s.block[new_n_iov]->end() + s.msg.descriptor[i].length; |
| |
| } else { |
| s.block[new_n_iov]->set(new_constant_IOBufferData(channel_dummy_output, DEFAULT_MAX_BUFFER_SIZE)); |
| |
| // Make block read_avail == descriptor[].length |
| s.block[new_n_iov]->fill(s.msg.descriptor[i].length); |
| } |
| } |
| s.iov[new_n_iov].iov_len = s.msg.descriptor[i].length; |
| to_do += s.iov[new_n_iov].iov_len; |
| ++new_n_iov; |
| } |
| } |
| } |
| // Release IOBufferBlock references used in previous i/o operation |
| for (n = new_n_iov; n < MAX_TCOUNT; ++n) { |
| s.block[n] = 0; |
| } |
| |
| // Initialize i/o state variables |
| s.to_do = to_do; |
| s.did = 0; |
| s.n_iov = new_n_iov; |
| return true; |
| |
| // TODO: This is apparently dead code, I added the #if 0 to avoid compiler |
| // warnings, but is this really intentional?? |
| #if 0 |
| // Release all IOBufferBlock references. |
| for (n = 0; n < MAX_TCOUNT; ++n) { |
| s.block[n] = 0; |
| } |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_OP_DELAYED_FOR_LOCK_STAT); |
| Debug(CL_WARN, "%s delayed for locks", read_flag ? "read" : "write"); |
| free_locks(read_flag, i); |
| return false; |
| #endif |
| } |
| |
| bool ClusterHandler::get_read_locks() |
| { |
| /////////////////////////////////////////////////////////////////////// |
| // Reacquire locks for the request setup by build_initial_vector(). |
| // We are called after each read completion prior to posting completion |
| /////////////////////////////////////////////////////////////////////// |
| ClusterState & s = read; |
| int i, n; |
| int bytes_processed; |
| int vec_bytes_remainder; |
| int iov_done[MAX_TCOUNT]; |
| |
| memset((char *) iov_done, 0, sizeof(int) * MAX_TCOUNT); |
| |
| // Compute bytes transferred on a per vector basis |
| bytes_processed = s.did - s.bytes_xfered; // not including bytes in this xfer |
| |
| i = -1; |
| for (n = 0; n < s.n_iov; ++n) { |
| bytes_processed -= s.iov[n].iov_len; |
| if (bytes_processed >= 0) { |
| iov_done[n] = s.iov[n].iov_len; |
| } else { |
| iov_done[n] = s.iov[n].iov_len + bytes_processed; |
| if (i < 0) { |
| i = n; // note i/o start vector |
| |
| // Now at vector where last transfer started, |
| // make considerations for the last transfer on this vector. |
| |
| vec_bytes_remainder = (s.iov[n].iov_len - iov_done[n]); |
| bytes_processed = s.bytes_xfered; |
| |
| bytes_processed -= vec_bytes_remainder; |
| if (bytes_processed >= 0) { |
| iov_done[n] = vec_bytes_remainder; |
| } else { |
| iov_done[n] = vec_bytes_remainder + bytes_processed; |
| break; |
| } |
| } else { |
| break; |
| } |
| } |
| } |
| ink_release_assert(i >= 0); |
| |
| // Start lock acquisition at the first vector where we started |
| // the last read. |
| // |
| // Note: We are assuming that free space descriptors follow |
| // the data descriptors. |
| |
| for (; i < s.n_iov; ++i) { |
| if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) |
| && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) { |
| |
| // Only user channels require locks |
| |
| ClusterVConnection * |
| vc = channels[s.msg.descriptor[i].channel]; |
| if (!VALID_CHANNEL(vc) || |
| ((s.msg.descriptor[i].sequence_number) != |
| CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) || !vc_ok_read(vc)) { |
| // Channel no longer valid, lock not needed since we |
| // already have a reference to the buffer |
| continue; |
| } |
| |
| ink_assert(!(ProxyMutex *) vc->read_locked); |
| vc->read_locked = vc->read.vio.mutex; |
| if (vc->byte_bank_q.head |
| || !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->read.vio.mutex, thread, vc->read.vio._cont, READ_LOCK_SPIN_COUNT)) { |
| // Pending byte bank completions or lock acquire failure. |
| |
| vc->read_locked = NULL; |
| continue; |
| } |
| // Since we now have the mutex, really see if reads are allowed. |
| |
| if (!vc_ok_read(vc)) { |
| MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread); |
| vc->read_locked = NULL; |
| continue; |
| } |
| // Lock acquire success, move read bytes into VC |
| |
| int64_t read_avail = vc->read_block->read_avail(); |
| |
| if (!vc->pending_remote_fill && read_avail) { |
| Debug("cluster_vc_xfer", "Deferred fill ch %d %p %" PRId64" bytes", vc->channel, vc, read_avail); |
| |
| vc->read.vio.buffer.writer()->append_block(vc->read_block->clone()); |
| if (complete_channel_read(read_avail, vc)) { |
| vc->read_block->consume(read_avail); |
| } |
| } |
| } |
| } |
| return true; // success |
| } |
| |
| bool ClusterHandler::get_write_locks() |
| { |
| /////////////////////////////////////////////////////////////////////// |
| // Reacquire locks for the request setup by build_initial_vector(). |
| // We are called after the entire write completes prior to |
| // posting completion. |
| /////////////////////////////////////////////////////////////////////// |
| ClusterState & s = write; |
| int i; |
| |
| for (i = 0; i < s.msg.count; ++i) { |
| if ((s.msg.descriptor[i].type == CLUSTER_SEND_DATA) |
| && (s.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL)) { |
| |
| // Only user channels require locks |
| |
| ClusterVConnection * |
| vc = channels[s.msg.descriptor[i].channel]; |
| if (!VALID_CHANNEL(vc) || |
| (s.msg.descriptor[i].sequence_number) != CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { |
| // Channel no longer valid, lock not needed since we |
| // already have a reference to the buffer |
| continue; |
| } |
| ink_assert(!(ProxyMutex *) vc->write_locked); |
| vc->write_locked = vc->write.vio.mutex; |
| #ifdef CLUSTER_TOMCAT |
| if (vc->write_locked && |
| !MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) { |
| #else |
| if (!MUTEX_TAKE_TRY_LOCK_FOR_SPIN(vc->write.vio.mutex, thread, vc->write.vio._cont, WRITE_LOCK_SPIN_COUNT)) { |
| #endif |
| // write lock acquire failed, free all acquired locks and retry later |
| vc->write_locked = 0; |
| free_locks(CLUSTER_WRITE, i); |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| void |
| ClusterHandler::swap_descriptor_bytes() |
| { |
| for (int i = 0; i < read.msg.count; i++) { |
| read.msg.descriptor[i].SwapBytes(); |
| } |
| } |
| |
| void |
| ClusterHandler::process_set_data_msgs() |
| { |
| uint32_t cluster_function_index; |
| // |
| // Cluster set_data messages must always be processed ahead of all |
| // messages and data. By convention, set_data messages (highest priority |
| // messages) always reside in the beginning of the descriptor |
| // and small control message structures. |
| // |
| |
| ///////////////////////////////////////////// |
| // Process small control set_data messages. |
| ///////////////////////////////////////////// |
| if (!read.msg.did_small_control_set_data) { |
| char *p = (char *) &read.msg.descriptor[read.msg.count]; |
| char *endp = p + read.msg.control_bytes; |
| while (p < endp) { |
| if (needByteSwap) { |
| ats_swap32((uint32_t *) p); // length |
| ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code |
| } |
| int len = *(int32_t *) p; |
| cluster_function_index = *(uint32_t *) (p + sizeof(int32_t)); |
| |
| if ((cluster_function_index < (uint32_t) SIZE_clusterFunction) |
| && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) { |
| clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, p + (2 * sizeof(uint32_t)), len - sizeof(uint32_t)); |
| // Mark message as processed. |
| *((uint32_t *) (p + sizeof(uint32_t))) = ~*((uint32_t *) (p + sizeof(uint32_t))); |
| p += (2 * sizeof(uint32_t)) + (len - sizeof(uint32_t)); |
| p = (char *) DOUBLE_ALIGN(p); |
| } else { |
| // Reverse swap since this message will be reprocessed. |
| |
| if (needByteSwap) { |
| ats_swap32((uint32_t *) p); // length |
| ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code |
| } |
| break; // End of set_data messages |
| } |
| } |
| read.msg.control_data_offset = p - (char *) &read.msg.descriptor[read.msg.count]; |
| read.msg.did_small_control_set_data = 1; |
| } |
| ///////////////////////////////////////////// |
| // Process large control set_data messages. |
| ///////////////////////////////////////////// |
| if (!read.msg.did_large_control_set_data) { |
| IncomingControl *ic = incoming_control.head; |
| |
| while (ic) { |
| if (needByteSwap) { |
| ats_swap32((uint32_t *) ic->data); // function code |
| } |
| cluster_function_index = *((uint32_t *) ic->data); |
| |
| if ((cluster_function_index < (uint32_t) SIZE_clusterFunction) |
| && (cluster_function_index == SET_CHANNEL_DATA_CLUSTER_FUNCTION)) { |
| |
| char *p = ic->data; |
| clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].pfn(this, |
| (void *) (p + sizeof(int32_t)), ic->len - sizeof(int32_t)); |
| |
| // Reverse swap since this will be processed again for deallocation. |
| if (needByteSwap) { |
| ats_swap32((uint32_t *) p); // length |
| ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code |
| } |
| // Mark message as processed. |
| // Defer dellocation until entire read is complete. |
| *((uint32_t *) p) = ~*((uint32_t *) p); |
| |
| ic = (IncomingControl *) ic->link.next; |
| } else { |
| // Reverse swap action this message will be reprocessed. |
| if (needByteSwap) { |
| ats_swap32((uint32_t *) ic->data); // function code |
| } |
| break; |
| } |
| } |
| read.msg.did_large_control_set_data = 1; |
| } |
| } |
| |
| void |
| ClusterHandler::process_small_control_msgs() |
| { |
| if (read.msg.did_small_control_msgs) { |
| return; |
| } else { |
| read.msg.did_small_control_msgs = 1; |
| } |
| |
| ink_hrtime now = ink_get_hrtime(); |
| char *p = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_data_offset; |
| char *endp = (char *) &read.msg.descriptor[read.msg.count] + read.msg.control_bytes; |
| |
| while (p < endp) { |
| ///////////////////////////////////////////////////////////////// |
| // Place non cluster small incoming messages on external |
| // incoming queue for processing by callout threads. |
| ///////////////////////////////////////////////////////////////// |
| if (needByteSwap) { |
| ats_swap32((uint32_t *) p); // length |
| ats_swap32((uint32_t *) (p + sizeof(int32_t))); // function code |
| } |
| int len = *(int32_t *) p; |
| p += sizeof(int32_t); |
| uint32_t cluster_function_index = *(uint32_t *) p; |
| ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION); |
| |
| if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) { |
| Warning("1Bad cluster function index (small control)"); |
| p += len; |
| |
| } else if (clusterFunction[cluster_function_index].ClusterFunc) { |
| ////////////////////////////////////////////////////////////////////// |
| // Cluster function, can only be processed in ET_CLUSTER thread |
| ////////////////////////////////////////////////////////////////////// |
| p += sizeof(uint32_t); |
| clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t)); |
| p += (len - sizeof(int32_t)); |
| |
| } else { |
| /////////////////////////////////////////////////////// |
| // Non Cluster function, defer to callout threads |
| /////////////////////////////////////////////////////// |
| IncomingControl *ic = IncomingControl::alloc(); |
| ic->recognized_time = now; |
| ic->len = len; |
| ic->alloc_data(); |
| memcpy(ic->data, p, ic->len); |
| SetHighBit(&ic->len); // mark as small cntl |
| ink_atomiclist_push(&external_incoming_control, (void *) ic); |
| p += len; |
| } |
| p = (char *) DOUBLE_ALIGN(p); |
| } |
| } |
| |
| void |
| ClusterHandler::process_large_control_msgs() |
| { |
| if (read.msg.did_large_control_msgs) { |
| return; |
| } else { |
| read.msg.did_large_control_msgs = 1; |
| } |
| |
| //////////////////////////////////////////////////////////////// |
| // Place non cluster large incoming messages on external |
| // incoming queue for processing by callout threads. |
| //////////////////////////////////////////////////////////////// |
| IncomingControl *ic = NULL; |
| uint32_t cluster_function_index; |
| |
| while ((ic = incoming_control.dequeue())) { |
| if (needByteSwap) { |
| ats_swap32((uint32_t *) ic->data); // function code |
| } |
| cluster_function_index = *((uint32_t *) ic->data); |
| ink_release_assert(cluster_function_index != SET_CHANNEL_DATA_CLUSTER_FUNCTION); |
| |
| if (cluster_function_index == (uint32_t) ~ SET_CHANNEL_DATA_CLUSTER_FUNCTION) { |
| // SET_CHANNEL_DATA_CLUSTER_FUNCTION already processed. |
| // Just do memory deallocation. |
| |
| if (!clusterFunction[SET_CHANNEL_DATA_CLUSTER_FUNCTION].fMalloced) |
| ic->freeall(); |
| continue; |
| } |
| |
| if (cluster_function_index >= (uint32_t) SIZE_clusterFunction) { |
| Warning("Bad cluster function index (large control)"); |
| ic->freeall(); |
| |
| } else if (clusterFunction[cluster_function_index].ClusterFunc) { |
| // Cluster message, process in ET_CLUSTER thread |
| clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)), |
| ic->len - sizeof(int32_t)); |
| |
| // Deallocate memory |
| if (!clusterFunction[cluster_function_index].fMalloced) |
| ic->freeall(); |
| |
| } else { |
| // Non Cluster message, process in non ET_CLUSTER thread |
| ink_atomiclist_push(&external_incoming_control, (void *) ic); |
| } |
| } |
| } |
| |
| void |
| ClusterHandler::process_freespace_msgs() |
| { |
| if (read.msg.did_freespace_msgs) { |
| return; |
| } else { |
| read.msg.did_freespace_msgs = 1; |
| } |
| |
| int i; |
| // |
| // unpack CLUSTER_SEND_FREE (VC free space) messages and update |
| // the free space in the target VC(s). |
| // |
| for (i = 0; i < read.msg.count; i++) { |
| if (read.msg.descriptor[i].type == CLUSTER_SEND_FREE && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) { |
| int c = read.msg.descriptor[i].channel; |
| if (c < n_channels && VALID_CHANNEL(channels[c]) && |
| (CLUSTER_SEQUENCE_NUMBER(channels[c]->token.sequence_number) == read.msg.descriptor[i].sequence_number)) { |
| // |
| // VC received freespace message, move it to the |
| // current bucket, since it may have data to |
| // write (WRITE_VC_PRIORITY). |
| // |
| channels[c]->remote_free = read.msg.descriptor[i].length; |
| vcs_push(channels[c], VC_CLUSTER_WRITE); |
| } |
| } |
| } |
| } |
| |
| void |
| ClusterHandler::add_to_byte_bank(ClusterVConnection * vc) |
| { |
| ByteBankDescriptor *bb_desc = ByteBankDescriptor::ByteBankDescriptor_alloc(vc->read_block); |
| bool pending_byte_bank_completion = vc->byte_bank_q.head ? true : false; |
| |
| // Put current byte bank descriptor on completion list |
| vc->byte_bank_q.enqueue(bb_desc); |
| |
| // Start byte bank completion action if not active |
| if (!pending_byte_bank_completion) { |
| ClusterVC_remove_read(vc); |
| delayed_reads.push(vc); |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_LEVEL1_BANK_STAT); |
| } else { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_MULTILEVEL_BANK_STAT); |
| } |
| vc->read_block = 0; |
| } |
| |
| void |
| ClusterHandler::update_channels_read() |
| { |
| // |
| // Update channels from which data has been read. |
| // |
| int i; |
| int len; |
| // This isn't used. |
| // int nread = read.bytes_xfered; |
| |
| process_set_data_msgs(); |
| |
| // |
| // update the ClusterVConnections |
| // |
| for (i = 0; i < read.msg.count; i++) { |
| if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) { |
| ClusterVConnection *vc = channels[read.msg.descriptor[i].channel]; |
| if (VALID_CHANNEL(vc) && |
| (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { |
| vc->last_activity_time = current_time; // note activity time |
| |
| len = read.msg.descriptor[i].length; |
| if (!len) { |
| continue; |
| } |
| |
| if (!vc->pending_remote_fill && vc_ok_read(vc) |
| && (!((ProxyMutex *) vc->read_locked) || vc->byte_bank_q.head)) { |
| // |
| // Byte bank active or unable to acquire lock on VC. |
| // Move data into the byte bank and attempt delivery |
| // at the next periodic event. |
| // |
| vc->read_block->fill(len); // note bytes received |
| add_to_byte_bank(vc); |
| |
| } else { |
| if (vc->pending_remote_fill || ((ProxyMutex *) vc->read_locked && vc_ok_read(vc))) { |
| vc->read_block->fill(len); // note bytes received |
| if (!vc->pending_remote_fill) { |
| vc->read.vio.buffer.writer()->append_block(vc->read_block->clone()); |
| vc->read_block->consume(len); // note bytes moved to user |
| } |
| complete_channel_read(len, vc); |
| } |
| } |
| } |
| } |
| } |
| |
| // Processs control and freespace messages |
| process_small_control_msgs(); |
| process_large_control_msgs(); |
| process_freespace_msgs(); |
| } |
| |
| // |
| // This member function is run in a non ET_CLUSTER thread, which |
| // performs the input message processing on behalf of ET_CLUSTER. |
| // Primary motivation is to allow blocking and unbounded runtime |
| // for message processing which cannot be done with a ET_CLUSTER thread. |
| // |
| int |
| ClusterHandler::process_incoming_callouts(ProxyMutex * m) |
| { |
| ProxyMutex *mutex = m; |
| ink_hrtime now; |
| // |
| // Atomically dequeue all active requests from the external queue and |
| // move them to the local working queue. Insertion queue order is |
| // maintained. |
| // |
| Queue<IncomingControl> local_incoming_control; |
| IncomingControl *ic_ext_next; |
| IncomingControl *ic_ext; |
| |
| while (true) { |
| ic_ext = (IncomingControl *) |
| ink_atomiclist_popall(&external_incoming_control); |
| if (!ic_ext) |
| break; |
| |
| while (ic_ext) { |
| ic_ext_next = (IncomingControl *) ic_ext->link.next; |
| ic_ext->link.next = NULL; |
| local_incoming_control.push(ic_ext); |
| ic_ext = ic_ext_next; |
| } |
| |
| // Perform callout actions for each message. |
| int small_control_msg; |
| IncomingControl *ic = NULL; |
| |
| while ((ic = local_incoming_control.pop())) { |
| LOG_EVENT_TIME(ic->recognized_time, inmsg_time_dist, inmsg_events); |
| |
| // Determine if this a small control message |
| small_control_msg = IsHighBitSet(&ic->len); |
| ClearHighBit(&ic->len); // Clear small msg flag bit |
| |
| if (small_control_msg) { |
| int len = ic->len; |
| char *p = ic->data; |
| uint32_t cluster_function_index = *(uint32_t *) p; |
| p += sizeof(uint32_t); |
| |
| if (cluster_function_index < (uint32_t) SIZE_clusterFunction) { |
| //////////////////////////////// |
| // Invoke processing function |
| //////////////////////////////// |
| ink_assert(!clusterFunction[cluster_function_index].ClusterFunc); |
| clusterFunction[cluster_function_index].pfn(this, p, len - sizeof(int32_t)); |
| now = ink_get_hrtime(); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time); |
| } else { |
| Warning("2Bad cluster function index (small control)"); |
| } |
| // Deallocate memory |
| if (!clusterFunction[cluster_function_index].fMalloced) |
| ic->freeall(); |
| |
| } else { |
| ink_assert(ic->len > 4); |
| uint32_t cluster_function_index = *(uint32_t *) ic->data; |
| bool valid_index; |
| |
| if (cluster_function_index < (uint32_t) SIZE_clusterFunction) { |
| valid_index = true; |
| //////////////////////////////// |
| // Invoke processing function |
| //////////////////////////////// |
| ink_assert(!clusterFunction[cluster_function_index].ClusterFunc); |
| clusterFunction[cluster_function_index].pfn(this, (void *) (ic->data + sizeof(int32_t)), |
| ic->len - sizeof(int32_t)); |
| now = ink_get_hrtime(); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_RECV_TIME_STAT, now - ic->recognized_time); |
| } else { |
| valid_index = false; |
| Warning("2Bad cluster function index (large control)"); |
| } |
| if (valid_index && !clusterFunction[cluster_function_index].fMalloced) |
| ic->freeall(); |
| } |
| } |
| } |
| return EVENT_CONT; |
| } |
| |
| void |
| ClusterHandler::update_channels_partial_read() |
| { |
| // |
| // We were unable to read the computed amount. Reflect the partial |
| // amount read in the associated VC read buffer data structures. |
| // |
| int i; |
| int64_t res = read.bytes_xfered; |
| |
| if (!res) { |
| return; |
| } |
| ink_assert(res <= read.did); |
| |
| // how much of the iov was done |
| |
| int64_t iov_done[MAX_TCOUNT]; |
| int64_t total = 0; |
| int64_t already_read = read.did - read.bytes_xfered; |
| |
| for (i = 0; i < read.n_iov; i++) { |
| ink_release_assert(already_read >= 0); |
| iov_done[i] = read.iov[i].iov_len; |
| |
| // Skip over bytes already processed |
| if (already_read) { |
| already_read -= iov_done[i]; |
| if (already_read < 0) { |
| iov_done[i] = -already_read; // bytes remaining |
| already_read = 0; |
| } else { |
| iov_done[i] = 0; |
| continue; |
| } |
| } |
| // Adjustments for partial read for the current transfer |
| res -= iov_done[i]; |
| if (res < 0) { |
| iov_done[i] += res; |
| res = 0; |
| } else { |
| total += iov_done[i]; |
| } |
| } |
| ink_assert(total <= read.did); |
| |
| int read_all_large_control_msgs = 0; |
| // |
| // update the ClusterVConnections buffer pointers |
| // |
| for (i = 0; i < read.msg.count; i++) { |
| if (read.msg.descriptor[i].type == CLUSTER_SEND_DATA && read.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) { |
| ClusterVConnection *vc = channels[read.msg.descriptor[i].channel]; |
| if (VALID_CHANNEL(vc) && |
| (read.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { |
| if (vc->pending_remote_fill || (vc_ok_read(vc) && (vc->iov_map != CLUSTER_IOV_NONE))) { |
| vc->last_activity_time = current_time; // note activity time |
| ClusterVConnState *s = &vc->read; |
| ink_assert(vc->iov_map < read.n_iov); |
| int len = iov_done[vc->iov_map]; |
| |
| if (len) { |
| if (!read_all_large_control_msgs) { |
| // |
| // Since all large set_data control messages reside at the |
| // beginning, all have been read if the first non-control |
| // descriptor contains > 0 bytes. |
| // Process them ahead of any VC data completion actions |
| // followed by small control and freespace message processing. |
| // |
| process_set_data_msgs(); |
| process_small_control_msgs(); |
| process_freespace_msgs(); |
| read_all_large_control_msgs = 1; |
| } |
| iov_done[vc->iov_map] = 0; |
| vc->read_block->fill(len); // note bytes received |
| |
| if (!vc->pending_remote_fill) { |
| if ((ProxyMutex *) vc->read_locked) { |
| Debug("cluster_vc_xfer", "Partial read, credit ch %d %p %d bytes", vc->channel, vc, len); |
| s->vio.buffer.writer()->append_block(vc->read_block->clone()); |
| if (complete_channel_read(len, vc)) { |
| vc->read_block->consume(len); // note bytes moved to user |
| } |
| |
| } else { |
| // If we have all the data for the VC, move it |
| // into the byte bank. Otherwise, do nothing since |
| // we will resume the read at this VC. |
| |
| if (len == (int) read.msg.descriptor[i].length) { |
| Debug("cluster_vc_xfer", "Partial read, byte bank move ch %d %p %d bytes", vc->channel, vc, len); |
| add_to_byte_bank(vc); |
| } |
| } |
| } else { |
| Debug("cluster_vc_xfer", "Partial remote fill read, credit ch %d %p %d bytes", vc->channel, vc, len); |
| complete_channel_read(len, vc); |
| } |
| read.msg.descriptor[i].length -= len; |
| ink_assert(((int) read.msg.descriptor[i].length) >= 0); |
| } |
| Debug(CL_TRACE, "partial_channel_read chan=%d len=%d", vc->channel, len); |
| } |
| } |
| } |
| } |
| } |
| |
| bool ClusterHandler::complete_channel_read(int len, ClusterVConnection * vc) |
| { |
| // |
| // We have processed a complete VC read request message for a channel, |
| // perform completion actions. |
| // |
| ClusterVConnState *s = &vc->read; |
| |
| if (vc->pending_remote_fill) { |
| Debug(CL_TRACE, "complete_channel_read chan=%d len=%d", vc->channel, len); |
| vc->initial_data_bytes += len; |
| ++vc->pending_remote_fill; // Note completion |
| return (vc->closed ? false : true); |
| } |
| |
| if (vc->closed) |
| return false; // No action if already closed |
| |
| ink_assert((ProxyMutex *) s->vio.mutex == (ProxyMutex *) s->vio._cont->mutex); |
| |
| Debug("cluster_vc_xfer", "Complete read, credit ch %d %p %d bytes", vc->channel, vc, len); |
| s->vio.ndone += len; |
| |
| if (s->vio.ntodo() <= 0) { |
| s->enabled = 0; |
| if (cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s) |
| == EVENT_DONE) |
| return false; |
| } else { |
| if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) == EVENT_DONE) |
| return false; |
| if (s->vio.ntodo() <= 0) |
| s->enabled = 0; |
| } |
| |
| vcs_push(vc, VC_CLUSTER_READ); |
| return true; |
| } |
| |
| void |
| ClusterHandler::finish_delayed_reads() |
| { |
| // |
| // Process pending VC delayed reads generated in the last read from |
| // the node to node connection. For explanation of "delayed read" see |
| // comments at the beginning of the member functions for ClusterHandler. |
| // |
| ClusterVConnection *vc = NULL; |
| DLL<ClusterVConnectionBase> l; |
| while ((vc = (ClusterVConnection *) delayed_reads.pop())) { |
| MUTEX_TRY_LOCK_SPIN(lock, vc->read.vio.mutex, thread, READ_LOCK_SPIN_COUNT); |
| if (lock) { |
| if (vc_ok_read(vc)) { |
| ink_assert(!vc->read.queue); |
| ByteBankDescriptor *d; |
| |
| while ((d = vc->byte_bank_q.dequeue())) { |
| if (vc->read.queue) { |
| // Previous complete_channel_read() put us back on the list, |
| // remove our self to process another byte bank completion |
| ClusterVC_remove_read(vc); |
| } |
| Debug("cluster_vc_xfer", |
| "Delayed read, credit ch %d %p %" PRId64" bytes", vc->channel, vc, d->get_block()->read_avail()); |
| vc->read.vio.buffer.writer()->append_block(d->get_block()); |
| |
| if (complete_channel_read(d->get_block()->read_avail(), vc)) { |
| ByteBankDescriptor::ByteBankDescriptor_free(d); |
| } else { |
| ByteBankDescriptor::ByteBankDescriptor_free(d); |
| break; |
| } |
| } |
| } |
| } else |
| l.push(vc); |
| } |
| delayed_reads = l; |
| } |
| |
| void |
| ClusterHandler::update_channels_written() |
| { |
| // |
| // We have sucessfully pushed the write data for the VC(s) described |
| // by the descriptors. |
| // Move the channels in this bucket to a new bucket. |
| // Lower the priority of those with too little data and raise that of |
| // those with too much data. |
| // |
| ink_hrtime now; |
| for (int i = 0; i < write.msg.count; i++) { |
| if (write.msg.descriptor[i].type == CLUSTER_SEND_DATA) { |
| if (write.msg.descriptor[i].channel != CLUSTER_CONTROL_CHANNEL) { |
| ClusterVConnection *vc = channels[write.msg.descriptor[i].channel]; |
| if (VALID_CHANNEL(vc) && |
| (write.msg.descriptor[i].sequence_number) == CLUSTER_SEQUENCE_NUMBER(vc->token.sequence_number)) { |
| |
| if (vc->pending_remote_fill) { |
| Debug(CL_TRACE, |
| "update_channels_written chan=%d seqno=%d len=%d", |
| write.msg.descriptor[i].channel, |
| write.msg.descriptor[i].sequence_number, write.msg.descriptor[i].length); |
| vc->pending_remote_fill = 0; |
| vc->remote_write_block = 0; // free data block |
| continue; // ignore remote write fill VC(s) |
| } |
| |
| ClusterVConnState *s = &vc->write; |
| int len = write.msg.descriptor[i].length; |
| vc->write_bytes_in_transit -= len; |
| ink_release_assert(vc->write_bytes_in_transit >= 0); |
| Debug(CL_PROTO, "(%d) data sent %d %" PRId64, write.msg.descriptor[i].channel, len, s->vio.ndone); |
| |
| if (vc_ok_write(vc)) { |
| vc->last_activity_time = current_time; // note activity time |
| int64_t ndone = vc->was_closed()? 0 : s->vio.ndone; |
| |
| if (ndone < vc->remote_free) { |
| vcs_push(vc, VC_CLUSTER_WRITE); |
| } |
| } |
| } |
| } else { |
| // |
| // Free up outgoing control message space |
| // |
| OutgoingControl *oc = write.msg.outgoing_control.dequeue(); |
| oc->free_data(); |
| oc->mutex = NULL; |
| now = ink_get_hrtime(); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - oc->submit_time); |
| LOG_EVENT_TIME(oc->submit_time, cluster_send_time_dist, cluster_send_events); |
| oc->freeall(); |
| } |
| } |
| } |
| // |
| // For compound messages, deallocate the data and header descriptors. |
| // The deallocation of the data descriptor will indirectly invoke |
| // the free memory proc described in set_data. |
| // |
| invoke_remote_data_args *args; |
| OutgoingControl *hdr_oc; |
| while ((hdr_oc = write.msg.outgoing_callout.dequeue())) { |
| args = (invoke_remote_data_args *) (hdr_oc->data + sizeof(int32_t)); |
| ink_assert(args->magicno == invoke_remote_data_args::MagicNo); |
| |
| // Free data descriptor |
| args->data_oc->free_data(); // invoke memory free callback |
| args->data_oc->mutex = NULL; |
| args->data_oc->freeall(); |
| |
| // Free descriptor |
| hdr_oc->free_data(); |
| hdr_oc->mutex = NULL; |
| now = ink_get_hrtime(); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - hdr_oc->submit_time); |
| LOG_EVENT_TIME(hdr_oc->submit_time, cluster_send_time_dist, cluster_send_events); |
| hdr_oc->freeall(); |
| } |
| } |
| |
| int |
| ClusterHandler::build_write_descriptors() |
| { |
| // |
| // Construct the write descriptors for VC write data in the current |
| // write_vcs bucket with considerations for maximum elements per |
| // write (struct iovec system maximum). |
| // |
| int count_bucket = cur_vcs; |
| int tcount = write.msg.count + 2; // count + descriptor |
| int write_descriptors_built = 0; |
| int valid; |
| int list_len = 0; |
| ClusterVConnection *vc, *vc_next; |
| |
| // |
| // Build descriptors for connections with stuff to send. |
| // |
| vc = (ClusterVConnection *)ink_atomiclist_popall(&write_vcs_ready); |
| while (vc) { |
| enter_exit(&cls_build_writes_entered, &cls_writes_exited); |
| vc_next = (ClusterVConnection *) vc->ready_alink.next; |
| vc->ready_alink.next = NULL; |
| list_len++; |
| if (VC_CLUSTER_CLOSED == vc->type) { |
| vc->in_vcs = false; |
| vc->type = VC_NULL; |
| clusterVCAllocator.free(vc); |
| vc = vc_next; |
| continue; |
| } |
| |
| if (tcount >= MAX_TCOUNT) { |
| vcs_push(vc, VC_CLUSTER_WRITE); |
| } else { |
| vc->in_vcs = false; |
| cluster_reschedule_offset(this, vc, &vc->write, 0); |
| tcount++; |
| } |
| vc = vc_next; |
| } |
| if (list_len) { |
| CLUSTER_SUM_DYN_STAT(CLUSTER_VC_WRITE_LIST_LEN_STAT, list_len); |
| } |
| |
| tcount = write.msg.count + 2; |
| vc_next = (ClusterVConnection *) write_vcs[count_bucket].head; |
| while (vc_next) { |
| vc = vc_next; |
| vc_next = (ClusterVConnection *) vc->write.link.next; |
| |
| if (VC_CLUSTER_CLOSED == vc->type) { |
| vc->type = VC_NULL; |
| clusterVCAllocator.free(vc); |
| vc = vc_next; |
| continue; |
| } |
| |
| if (tcount >= MAX_TCOUNT) |
| break; |
| |
| valid = valid_for_data_write(vc); |
| if (-1 == valid) { |
| vcs_push(vc, VC_CLUSTER_WRITE); |
| } else if (valid) { |
| ink_assert(vc->write_locked); // Acquired in valid_for_data_write() |
| if ((vc->remote_free > (vc->write.vio.ndone - vc->write_list_bytes)) |
| && channels[vc->channel] == vc) { |
| |
| ink_assert(vc->write_list && vc->write_list_bytes); |
| |
| int d = write.msg.count; |
| write.msg.descriptor[d].type = CLUSTER_SEND_DATA; |
| write.msg.descriptor[d].channel = vc->channel; |
| write.msg.descriptor[d].sequence_number = vc->token.sequence_number; |
| int s = vc->write_list_bytes; |
| ink_release_assert(s <= MAX_CLUSTER_SEND_LENGTH); |
| |
| // Transfer no more than nbytes |
| if ((vc->write.vio.ndone - s) > vc->write.vio.nbytes) |
| s = vc->write.vio.nbytes - (vc->write.vio.ndone - s); |
| |
| if ((vc->write.vio.ndone - s) > vc->remote_free) |
| s = vc->remote_free - (vc->write.vio.ndone - s); |
| write.msg.descriptor[d].length = s; |
| write.msg.count++; |
| tcount++; |
| write_descriptors_built++; |
| |
| #ifdef CLUSTER_STATS |
| _vc_writes++; |
| _vc_write_bytes += s; |
| #endif |
| |
| } else { |
| MUTEX_UNTAKE_LOCK(vc->write_locked, thread); |
| vc->write_locked = NULL; |
| |
| if (channels[vc->channel] == vc) |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_NO_REMOTE_SPACE_STAT); |
| } |
| } |
| } |
| return (write_descriptors_built); |
| } |
| |
| int |
| ClusterHandler::build_freespace_descriptors() |
| { |
| // |
| // Construct the write descriptors for VC freespace data in the current |
| // read_vcs bucket with considerations for maximum elements per |
| // write (struct iovec system maximum) and for pending elements already |
| // in the list. |
| // |
| int count_bucket = cur_vcs; |
| int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s) |
| int freespace_descriptors_built = 0; |
| int s = 0; |
| int list_len = 0; |
| ClusterVConnection *vc, *vc_next; |
| |
| // |
| // Build descriptors for available space |
| // |
| vc = (ClusterVConnection *)ink_atomiclist_popall(&read_vcs_ready); |
| while (vc) { |
| enter_exit(&cls_build_reads_entered, &cls_reads_exited); |
| vc_next = (ClusterVConnection *) vc->ready_alink.next; |
| vc->ready_alink.next = NULL; |
| list_len++; |
| if (VC_CLUSTER_CLOSED == vc->type) { |
| vc->in_vcs = false; |
| vc->type = VC_NULL; |
| clusterVCAllocator.free(vc); |
| vc = vc_next; |
| continue; |
| } |
| |
| if (tcount >= MAX_TCOUNT) { |
| vcs_push(vc, VC_CLUSTER_READ); |
| } else { |
| vc->in_vcs = false; |
| cluster_reschedule_offset(this, vc, &vc->read, 0); |
| tcount++; |
| } |
| vc = vc_next; |
| } |
| if (list_len) { |
| CLUSTER_SUM_DYN_STAT(CLUSTER_VC_READ_LIST_LEN_STAT, list_len); |
| } |
| |
| tcount = write.msg.count + 2; |
| vc_next = (ClusterVConnection *) read_vcs[count_bucket].head; |
| while (vc_next) { |
| vc = vc_next; |
| vc_next = (ClusterVConnection *) vc->read.link.next; |
| |
| if (VC_CLUSTER_CLOSED == vc->type) { |
| vc->type = VC_NULL; |
| clusterVCAllocator.free(vc); |
| vc = vc_next; |
| continue; |
| } |
| |
| if (tcount >= MAX_TCOUNT) |
| break; |
| |
| s = valid_for_freespace_write(vc); |
| if (-1 == s) { |
| vcs_push(vc, VC_CLUSTER_READ); |
| } else if (s) { |
| if (vc_ok_read(vc) && channels[vc->channel] == vc) { |
| // Send free space only if changed |
| int d = write.msg.count; |
| write.msg.descriptor[d].type = CLUSTER_SEND_FREE; |
| write.msg.descriptor[d].channel = vc->channel; |
| write.msg.descriptor[d].sequence_number = vc->token.sequence_number; |
| |
| ink_assert(s > 0); |
| write.msg.descriptor[d].length = s; |
| vc->last_local_free = s; |
| Debug(CL_PROTO, "(%d) free space priority %d", vc->channel, vc->read.priority); |
| write.msg.count++; |
| tcount++; |
| freespace_descriptors_built++; |
| } |
| } |
| vc = vc_next; |
| } |
| return (freespace_descriptors_built); |
| } |
| |
| int |
| ClusterHandler::build_controlmsg_descriptors() |
| { |
| // |
| // Construct the write descriptors for control message data in the |
| // outgoing_control queue with considerations for maximum elements per |
| // write (struct iovec system maximum) and for elements already |
| // in the list. |
| // |
| int tcount = write.msg.count + 2; // count + descriptor require 2 iovec(s) |
| int control_msgs_built = 0; |
| bool compound_msg; // msg + chan data |
| // |
| // Build descriptors for control messages |
| // |
| OutgoingControl *c = NULL; |
| int control_bytes = 0; |
| int q = 0; |
| |
| while (tcount < (MAX_TCOUNT - 1)) { // -1 to allow for compound messages |
| c = outgoing_control[q].pop(); |
| if (!c) { |
| // Move elements from global outgoing_control to local queue |
| OutgoingControl *c_next; |
| c = (OutgoingControl *) ink_atomiclist_popall(&outgoing_control_al[q]); |
| if (c == 0) { |
| if (++q >= CLUSTER_CMSG_QUEUES) { |
| break; |
| } else { |
| continue; |
| } |
| } |
| while (c) { |
| c_next = (OutgoingControl *) c->link.next; |
| c->link.next = NULL; |
| outgoing_control[q].push(c); |
| c = c_next; |
| } |
| continue; |
| |
| } else { |
| compound_msg = (*((int32_t *) c->data) == -1); // (msg+chan data)? |
| } |
| if (!compound_msg && c->len <= SMALL_CONTROL_MESSAGE && |
| // check if the receiving cluster function will want to malloc'ed data |
| !clusterFunction[*(int32_t *) c->data].fMalloced && control_bytes + c->len + sizeof(int32_t) * 2 + 7 < CONTROL_DATA) { |
| write.msg.outgoing_small_control.enqueue(c); |
| control_bytes += c->len + sizeof(int32_t) * 2 + 7; // safe approximation |
| control_msgs_built++; |
| |
| if (clusterFunction[*(int32_t *) c->data].post_pfn) { |
| clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len); |
| } |
| continue; |
| } |
| // |
| // Build large control message descriptor |
| // |
| if (compound_msg) { |
| // Extract out components of compound message. |
| invoke_remote_data_args *cmhdr = (invoke_remote_data_args *) (c->data + sizeof(int32_t)); |
| OutgoingControl *oc_header = c; |
| OutgoingControl *oc_msg = cmhdr->msg_oc; |
| OutgoingControl *oc_data = cmhdr->data_oc; |
| |
| ink_assert(cmhdr->magicno == invoke_remote_data_args::MagicNo); |
| // |
| // Build descriptors and order the data before the reply message. |
| // Reply message processing assumes data completion action performed |
| // prior to processing completion message. |
| // Not an issue today since channel data is always processed first. |
| // |
| int d; |
| d = write.msg.count; |
| write.msg.descriptor[d].type = CLUSTER_SEND_DATA; |
| write.msg.descriptor[d].channel = cmhdr->dest_channel; |
| write.msg.descriptor[d].length = oc_data->len; |
| write.msg.descriptor[d].sequence_number = cmhdr->token.sequence_number; |
| |
| #ifdef CLUSTER_STATS |
| _vc_write_bytes += oc_data->len; |
| #endif |
| |
| // Setup remote write fill iovec. Remote write fills have no VIO. |
| ClusterVConnection *vc = channels[cmhdr->dest_channel]; |
| |
| if (VALID_CHANNEL(vc) && vc->pending_remote_fill) { |
| ink_release_assert(!vc->remote_write_block); |
| vc->remote_write_block = oc_data->get_block(); |
| |
| // Note: No array overrun since we are bounded by (MAX_TCOUNT-1). |
| write.msg.count++; |
| tcount++; |
| control_msgs_built++; |
| d = write.msg.count; |
| write.msg.outgoing_control.enqueue(oc_msg); |
| write.msg.descriptor[d].type = CLUSTER_SEND_DATA; |
| write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL; |
| write.msg.descriptor[d].length = oc_msg->len; |
| |
| #ifdef CLUSTER_STATS |
| _control_write_bytes += oc_msg->len; |
| #endif |
| |
| write.msg.count++; |
| tcount++; |
| control_msgs_built++; |
| |
| // Queue header to process buffer free memory callbacks after send. |
| write.msg.outgoing_callout.enqueue(oc_header); |
| |
| } else { |
| // Operation cancelled free memory. |
| Warning("Pending remote read fill aborted chan=%d len=%d", cmhdr->dest_channel, oc_data->len); |
| |
| // Free compound message |
| oc_header->free_data(); |
| oc_header->mutex = NULL; |
| oc_header->freeall(); |
| |
| // Free response message |
| oc_msg->free_data(); |
| oc_msg->mutex = 0; |
| oc_msg->freeall(); |
| |
| // Free data descriptor |
| oc_data->free_data(); // invoke memory free callback |
| oc_data->mutex = 0; |
| oc_data->freeall(); |
| } |
| |
| } else { |
| write.msg.outgoing_control.enqueue(c); |
| |
| int d = write.msg.count; |
| write.msg.descriptor[d].type = CLUSTER_SEND_DATA; |
| write.msg.descriptor[d].channel = CLUSTER_CONTROL_CHANNEL; |
| write.msg.descriptor[d].length = c->len; |
| |
| #ifdef CLUSTER_STATS |
| _control_write_bytes += c->len; |
| #endif |
| |
| write.msg.count++; |
| tcount++; |
| control_msgs_built++; |
| |
| if (clusterFunction[*(int32_t *) c->data].post_pfn) { |
| clusterFunction[*(int32_t *) c->data].post_pfn(this, c->data + sizeof(int32_t), c->len); |
| } |
| } |
| } |
| return control_msgs_built; |
| } |
| |
| int |
| ClusterHandler::add_small_controlmsg_descriptors() |
| { |
| // |
| // Move small control message data to free space after descriptors |
| // |
| char *p = (char *) &write.msg.descriptor[write.msg.count]; |
| OutgoingControl *c = NULL; |
| |
| while ((c = write.msg.outgoing_small_control.dequeue())) { |
| *(int32_t *) p = c->len; |
| p += sizeof(int32_t); |
| memcpy(p, c->data, c->len); |
| c->free_data(); |
| c->mutex = NULL; |
| p += c->len; |
| ink_hrtime now = ink_get_hrtime(); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_CTRL_MSGS_SEND_TIME_STAT, now - c->submit_time); |
| LOG_EVENT_TIME(c->submit_time, cluster_send_time_dist, cluster_send_events); |
| c->freeall(); |
| p = (char *) DOUBLE_ALIGN(p); |
| } |
| write.msg.control_bytes = p - (char *) &write.msg.descriptor[write.msg.count]; |
| |
| #ifdef CLUSTER_STATS |
| _control_write_bytes += write.msg.control_bytes; |
| #endif |
| |
| return 1; |
| } |
| |
| struct DestructorLock |
| { |
| DestructorLock(EThread * thread) |
| { |
| have_lock = false; |
| t = thread; |
| } |
| ~DestructorLock() |
| { |
| if (have_lock && m) { |
| Mutex_unlock(m, t); |
| } |
| m = 0; |
| } |
| EThread *t; |
| Ptr<ProxyMutex> m; |
| bool have_lock; |
| }; |
| |
| int |
| ClusterHandler::valid_for_data_write(ClusterVConnection * vc) |
| { |
| // |
| // Determine if writes are allowed on this VC |
| // |
| ClusterVConnState *s = &vc->write; |
| |
| ink_assert(!on_stolen_thread); |
| ink_assert((ProxyMutex *) ! vc->write_locked); |
| |
| // |
| // Attempt to get the lock, if we miss, push vc into the future |
| // |
| DestructorLock lock(thread); |
| |
| retry: |
| if ((lock.m = s->vio.mutex)) { |
| lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, WRITE_LOCK_SPIN_COUNT); |
| if (!lock.have_lock) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_WRITE_LOCKED_STAT); |
| |
| #ifdef CLUSTER_STATS |
| _dw_missed_lock++; |
| #endif |
| return -1; |
| } |
| } |
| |
| if (vc->was_closed()) { |
| if (vc->schedule_write()) { |
| #ifdef CLUSTER_TOMCAT |
| ink_assert(lock.m); |
| #endif |
| vc->write_locked = lock.m; |
| lock.m = 0; |
| lock.have_lock = false; |
| return 1; |
| } else { |
| if (!vc->write_bytes_in_transit) { |
| close_ClusterVConnection(vc); |
| } |
| return 0; |
| } |
| } |
| |
| if (!s->enabled && !vc->was_remote_closed()) { |
| #ifdef CLUSTER_STATS |
| _dw_not_enabled++; |
| #endif |
| return 0; |
| } |
| |
| if (vc->pending_remote_fill) { |
| if (vc->was_remote_closed()) |
| close_ClusterVConnection(vc); |
| |
| #ifdef CLUSTER_STATS |
| _dw_wait_remote_fill++; |
| #endif |
| return 0; |
| } |
| |
| if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) { |
| if (!lock.have_lock && s->vio.mutex && s->vio._cont) { |
| goto retry; |
| } else { |
| // No active VIO |
| #ifdef CLUSTER_STATS |
| _dw_no_active_vio++; |
| #endif |
| return 0; |
| } |
| } |
| // |
| // If this connection has been closed remotely, send EOS |
| // |
| if (vc->was_remote_closed()) { |
| if (!vc->write_bytes_in_transit && !vc->schedule_write()) { |
| remote_close(vc, s); |
| } |
| return 0; |
| } |
| // |
| // If not enabled or not WRITE |
| // |
| if (!s->enabled || s->vio.op != VIO::WRITE) { |
| s->enabled = 0; |
| #ifdef CLUSTER_STATS |
| _dw_not_enabled_or_no_write++; |
| #endif |
| return 0; |
| } |
| // |
| // If no room on the remote side or set_data() messages pending |
| // |
| int set_data_msgs_pending = vc->n_set_data_msgs; |
| if (set_data_msgs_pending || (vc->remote_free <= (s->vio.ndone - vc->write_list_bytes))) { |
| if (set_data_msgs_pending) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_WRITE_STALL_STAT); |
| |
| #ifdef CLUSTER_STATS |
| _dw_set_data_pending++; |
| #endif |
| |
| } else { |
| #ifdef CLUSTER_STATS |
| _dw_no_free_space++; |
| #endif |
| } |
| return 0; |
| } |
| // |
| // Calculate amount writable |
| // |
| MIOBufferAccessor & buf = s->vio.buffer; |
| |
| int64_t towrite = buf.reader()->read_avail(); |
| int64_t ntodo = s->vio.ntodo(); |
| bool write_vc_signal = false; |
| |
| if (towrite > ntodo) |
| towrite = ntodo; |
| |
| ink_assert(ntodo >= 0); |
| if (ntodo <= 0) { |
| cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s); |
| return 0; |
| } |
| if (buf.writer()->write_avail() && towrite != ntodo) { |
| write_vc_signal = true; |
| if (cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s) == EVENT_DONE) |
| return 0; |
| ink_assert(s->vio.ntodo() >= 0); |
| if (s->vio.ntodo() <= 0) { |
| cluster_signal_and_update(VC_EVENT_WRITE_COMPLETE, vc, s); |
| return 0; |
| } |
| } |
| // Clone nbytes of vio.buffer.reader IOBufferBlock list allowing |
| // write_list to contain no more than DEFAULT_MAX_BUFFER_SIZE bytes. |
| |
| Ptr<IOBufferBlock> b_list; |
| IOBufferBlock *b_tail; |
| int bytes_to_fill; |
| int consume_bytes; |
| |
| bytes_to_fill = DEFAULT_MAX_BUFFER_SIZE - vc->write_list_bytes; |
| |
| if (towrite && bytes_to_fill) { |
| consume_bytes = (towrite > bytes_to_fill) ? bytes_to_fill : towrite; |
| b_list = clone_IOBufferBlockList(s->vio.buffer.reader()->block, |
| s->vio.buffer.reader()->start_offset, consume_bytes, &b_tail); |
| ink_assert(b_tail); |
| |
| // Append cloned IOBufferBlock list to VC write_list. |
| |
| if (vc->write_list_tail) { |
| vc->write_list_tail->next = b_list; |
| } else { |
| vc->write_list = b_list; |
| } |
| vc->write_list_tail = b_tail; |
| vc->write_list_bytes += consume_bytes; |
| ink_assert(bytes_IOBufferBlockList(vc->write_list, 1) == vc->write_list_bytes); |
| |
| // We may defer the write, but tell the user we have consumed the data. |
| |
| (s->vio.buffer.reader())->consume(consume_bytes); |
| s->vio.ndone += consume_bytes; |
| if (s->vio.ntodo() <= 0) { |
| cluster_signal_and_update_locked(VC_EVENT_WRITE_COMPLETE, vc, s); |
| } |
| } |
| |
| if (vc->schedule_write()) { |
| #ifdef CLUSTER_TOMCAT |
| ink_assert(s->vio.mutex); |
| #endif |
| vc->write_locked = lock.m; |
| lock.m = 0; |
| lock.have_lock = false; |
| return 1; |
| } else { |
| if (!write_vc_signal && buf.writer()->write_avail() && towrite != ntodo) |
| cluster_signal_and_update(VC_EVENT_WRITE_READY, vc, s); |
| return 0; |
| } |
| } |
| |
| int |
| ClusterHandler::valid_for_freespace_write(ClusterVConnection * vc) |
| { |
| // |
| // Determine if freespace messages are allowed on this VC |
| // |
| ClusterVConnState *s = &vc->read; |
| |
| ink_assert(!on_stolen_thread); |
| |
| // |
| // Attempt to get the lock, if we miss, push vc into the future |
| // |
| DestructorLock lock(thread); |
| |
| retry: |
| if ((lock.m = s->vio.mutex)) { |
| lock.have_lock = MUTEX_TAKE_TRY_LOCK_FOR_SPIN(lock.m, thread, s->vio._cont, READ_LOCK_SPIN_COUNT); |
| |
| if (!lock.have_lock) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CONNECTIONS_READ_LOCKED_STAT); |
| |
| #ifdef CLUSTER_STATS |
| _fw_missed_lock++; |
| #endif |
| return -1; |
| } |
| } |
| if (vc->was_closed()) { |
| if (!vc->write_bytes_in_transit && !vc->schedule_write()) { |
| close_ClusterVConnection(vc); |
| } |
| return 0; |
| } |
| |
| if (!s->enabled && !vc->was_remote_closed()) { |
| #ifdef CLUSTER_STATS |
| _fw_not_enabled++; |
| #endif |
| return 0; |
| } |
| |
| if (vc->pending_remote_fill) { |
| if (vc->was_remote_closed()) |
| close_ClusterVConnection(vc); |
| |
| #ifdef CLUSTER_STATS |
| _fw_wait_remote_fill++; |
| #endif |
| return 0; |
| } |
| |
| if (!lock.have_lock || !s->vio.mutex || !s->vio._cont) { |
| if (!lock.have_lock && s->vio.mutex && s->vio._cont) { |
| goto retry; |
| } else { |
| // No active VIO |
| #ifdef CLUSTER_STATS |
| _fw_no_active_vio++; |
| #endif |
| return 0; |
| } |
| } |
| // |
| // If this connection has been closed remotely, send EOS |
| // |
| if (vc->was_remote_closed()) { |
| if (vc->write_bytes_in_transit || vc->schedule_write()) { |
| // Defer close until write data is pushed |
| return 0; |
| } |
| remote_close(vc, s); |
| return 0; |
| } |
| // |
| // If not enabled or not WRITE |
| // |
| if (!s->enabled || s->vio.op != VIO::READ) { |
| #ifdef CLUSTER_STATS |
| _fw_not_enabled_or_no_read++; |
| #endif |
| return 0; |
| } |
| |
| int64_t ntodo = s->vio.ntodo(); |
| ink_assert(ntodo >= 0); |
| |
| if (ntodo <= 0) { |
| cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, s); |
| return 0; |
| } |
| |
| int64_t bytes_to_move = vc->initial_data_bytes; |
| if (vc->read_block && bytes_to_move) { |
| |
| // Push initial read data into VC |
| |
| if (ntodo >= bytes_to_move) { |
| Debug("cluster_vc_xfer", "finish initial data push ch %d bytes %" PRId64, vc->channel, vc->read_block->read_avail()); |
| |
| s->vio.buffer.writer()->append_block(vc->read_block->clone()); |
| vc->read_block = 0; |
| |
| } else { |
| bytes_to_move = ntodo; |
| |
| Debug("cluster_vc_xfer", "initial data push ch %d bytes %" PRId64, vc->channel, bytes_to_move); |
| |
| // Clone a portion of the data |
| |
| IOBufferBlock *b, *btail; |
| b = clone_IOBufferBlockList(vc->read_block, 0, bytes_to_move, &btail); |
| s->vio.buffer.writer()->append_block(b); |
| vc->read_block->consume(bytes_to_move); |
| } |
| s->vio.ndone += bytes_to_move; |
| vc->initial_data_bytes -= bytes_to_move; |
| |
| if (s->vio.ntodo() <= 0) { |
| s->enabled = 0; |
| cluster_signal_and_update_locked(VC_EVENT_READ_COMPLETE, vc, s); |
| return 0; |
| |
| } else { |
| if (vc->have_all_data) { |
| if (!vc->read_block) { |
| s->enabled = 0; |
| cluster_signal_and_update(VC_EVENT_EOS, vc, s); |
| return 0; |
| } |
| } |
| if (cluster_signal_and_update_locked(VC_EVENT_READ_READY, vc, s) |
| == EVENT_DONE) |
| return false; |
| |
| if (s->vio.ntodo() <= 0) |
| s->enabled = 0; |
| |
| if (vc->initial_data_bytes) |
| return 0; |
| } |
| } |
| // At this point, all initial read data passed in the open_read reply |
| // has been moved into the user VC. |
| // Now allow send of freespace to receive additional data. |
| |
| int64_t nextfree = vc->read.vio.ndone; |
| |
| nextfree = (nextfree + DEFAULT_MAX_BUFFER_SIZE - 1) / DEFAULT_MAX_BUFFER_SIZE; |
| nextfree *= DEFAULT_MAX_BUFFER_SIZE; |
| |
| if (nextfree >= (vc->last_local_free / 2)) { |
| nextfree = vc->last_local_free + (8 * DEFAULT_MAX_BUFFER_SIZE); |
| } |
| |
| if ((vc->last_local_free == 0) || (nextfree >= vc->last_local_free)) { |
| Debug(CL_PROTO, "(%d) update freespace %" PRId64, vc->channel, nextfree); |
| // |
| // Have good VC candidate locked for freespace write |
| // |
| return nextfree; |
| |
| } else { |
| // No free space update required |
| return 0; |
| } |
| } |
| |
| void |
| ClusterHandler::vcs_push(ClusterVConnection * vc, int type) |
| { |
| if (vc->type <= VC_CLUSTER) |
| vc->type = type; |
| |
| while ((vc->type > VC_CLUSTER) && !vc->in_vcs && ink_atomic_cas(pvint32(&vc->in_vcs), 0, 1)) { |
| if (vc->type == VC_CLUSTER_READ) |
| ink_atomiclist_push(&vc->ch->read_vcs_ready, (void *)vc); |
| else |
| ink_atomiclist_push(&vc->ch->write_vcs_ready, (void *)vc); |
| return; |
| } |
| } |
| |
| int |
| ClusterHandler::remote_close(ClusterVConnection * vc, ClusterVConnState * ns) |
| { |
| if (ns->vio.op != VIO::NONE && !vc->closed) { |
| ns->enabled = 0; |
| if (vc->remote_closed > 0) { |
| if (ns->vio.op == VIO::READ) { |
| if (ns->vio.nbytes == ns->vio.ndone) { |
| return cluster_signal_and_update(VC_EVENT_READ_COMPLETE, vc, ns); |
| } else { |
| return cluster_signal_and_update(VC_EVENT_EOS, vc, ns); |
| } |
| } else { |
| return cluster_signal_and_update(VC_EVENT_EOS, vc, ns); |
| } |
| } else { |
| return cluster_signal_error_and_update(vc, ns, vc->remote_lerrno); |
| } |
| } |
| return EVENT_CONT; |
| } |
| |
| void |
| ClusterHandler::steal_thread(EThread * t) |
| { |
| // |
| // Attempt to push the control message now instead of waiting |
| // for the periodic event to process it. |
| // |
| if (t != thread && // different thread to steal |
| write.to_do <= 0 && // currently not trying to send data |
| // nothing big outstanding |
| !write.msg.count) { |
| mainClusterEvent(CLUSTER_EVENT_STEAL_THREAD, (Event *) t); |
| } |
| } |
| |
| void |
| ClusterHandler::free_locks(bool read_flag, int i) |
| { |
| // |
| // Free VC locks. Handle partial acquires up to i |
| // |
| if (i == CLUSTER_FREE_ALL_LOCKS) { |
| if (read_flag) { |
| i = (read.msg.state >= 2 ? read.msg.count : 0); |
| } else { |
| i = write.msg.count; |
| } |
| } |
| ClusterState & s = (read_flag ? read : write); |
| for (int j = 0; j < i; j++) { |
| if (s.msg.descriptor[j].type == CLUSTER_SEND_DATA && s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) { |
| ClusterVConnection *vc = channels[s.msg.descriptor[j].channel]; |
| if (VALID_CHANNEL(vc)) { |
| if (read_flag) { |
| if ((ProxyMutex *) vc->read_locked) { |
| MUTEX_UNTAKE_LOCK(vc->read.vio.mutex, thread); |
| vc->read_locked = NULL; |
| } |
| } else { |
| if ((ProxyMutex *) vc->write_locked) { |
| MUTEX_UNTAKE_LOCK(vc->write_locked, thread); |
| vc->write_locked = NULL; |
| } |
| } |
| } |
| } else if (!read_flag && |
| s.msg.descriptor[j].type == CLUSTER_SEND_FREE && |
| s.msg.descriptor[j].channel != CLUSTER_CONTROL_CHANNEL) { |
| ClusterVConnection *vc = channels[s.msg.descriptor[j].channel]; |
| if (VALID_CHANNEL(vc)) { |
| if ((ProxyMutex *) vc->read_locked) { |
| MUTEX_UNTAKE_LOCK(vc->read_locked, thread); |
| vc->read_locked = NULL; |
| } |
| } |
| } |
| } |
| } |
| |
| #ifdef CLUSTER_IMMEDIATE_NETIO |
| void |
| ClusterHandler::build_poll(bool next) |
| { |
| Pollfd *pfd; |
| if (next) { |
| pfd = thread->nextPollDescriptor->alloc(); |
| pfd->fd = net_vc->get_socket(); |
| ifd = pfd - thread->nextPollDescriptor->pfd; |
| } else { |
| pfd = thread->pollDescriptor->alloc(); |
| pfd->fd = net_vc->get_socket(); |
| ifd = pfd - thread->pollDescriptor->pfd; |
| } |
| pfd->events = POLLHUP; |
| if (next) { |
| if (read.to_do) |
| pfd->events |= POLLIN; |
| if (write.to_do) |
| pfd->events |= POLLOUT; |
| } else { |
| // we have to lie since we are in the same cycle |
| pfd->events = POLLIN | POLLOUT; |
| // reads/writes are non-blocking anyway |
| pfd->revents = POLLIN | POLLOUT; |
| } |
| } |
| #endif // CLUSTER_IMMEDIATE_NETIO |
| |
| extern int CacheClusterMonitorEnabled; |
| extern int CacheClusterMonitorIntervalSecs; |
| |
| |
| // |
| // The main event for machine-machine link |
| // |
| int |
| ClusterHandler::mainClusterEvent(int event, Event * e) |
| { |
| // Set global time |
| current_time = ink_get_hrtime(); |
| |
| if (CacheClusterMonitorEnabled) { |
| if ((current_time - last_trace_dump) > HRTIME_SECONDS(CacheClusterMonitorIntervalSecs)) { |
| last_trace_dump = current_time; |
| dump_internal_data(); |
| } |
| } |
| // |
| // Note: The caller always acquires the ClusterHandler mutex prior |
| // to the call. This guarantees single threaded access in |
| // mainClusterEvent() |
| // |
| |
| ///////////////////////////////////////////////////////////////////////// |
| // If cluster interconnect is overloaded, disable remote cluster ops. |
| ///////////////////////////////////////////////////////////////////////// |
| #ifndef DEBUG |
| if (clm && ClusterLoadMonitor::cf_monitor_enabled > 0) { |
| #else |
| if (0) { |
| #endif |
| bool last_state = disable_remote_cluster_ops; |
| if (clm->is_cluster_overloaded()) { |
| disable_remote_cluster_ops = true; |
| } else { |
| disable_remote_cluster_ops = false; |
| } |
| if (last_state != disable_remote_cluster_ops) { |
| if (disable_remote_cluster_ops) { |
| Note("Network congestion to [%u.%u.%u.%u] encountered, reverting to proxy only mode", DOT_SEPARATED(ip)); |
| } else { |
| Note("Network congestion to [%u.%u.%u.%u] cleared, reverting to cache mode", DOT_SEPARATED(ip)); |
| last_cluster_op_enable = current_time; |
| } |
| } |
| } |
| |
| on_stolen_thread = (event == CLUSTER_EVENT_STEAL_THREAD); |
| bool io_callback = (event == EVENT_IMMEDIATE); |
| |
| if (on_stolen_thread) { |
| thread = (EThread *) e; |
| } else { |
| if (io_callback) { |
| thread = this_ethread(); |
| } else { |
| thread = e->ethread; |
| } |
| } |
| |
| int io_activity = 1; |
| bool only_write_control_msgs; |
| int res; |
| |
| while (io_activity) { |
| io_activity = 0; |
| only_write_control_msgs = 0; |
| |
| if (downing) { |
| machine_down(); |
| break; |
| } |
| |
| ////////////////////////// |
| // Read Processing |
| ////////////////////////// |
| if (!on_stolen_thread) { |
| if (delayed_reads.head) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT); |
| finish_delayed_reads(); |
| } |
| if ((res = process_read(current_time)) < 0) { |
| break; |
| } |
| io_activity += res; |
| |
| if (delayed_reads.head) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_DELAYED_READS_STAT); |
| finish_delayed_reads(); |
| } |
| } |
| ///////////////////////// |
| // Write Processing |
| ///////////////////////// |
| if ((res = process_write(current_time, only_write_control_msgs)) < 0) { |
| break; |
| } |
| io_activity += res; |
| |
| ///////////////////////////////////////// |
| // Process deferred open_local requests |
| ///////////////////////////////////////// |
| if (!on_stolen_thread) { |
| if (do_open_local_requests()) |
| thread->signal_hook(thread); |
| } |
| } |
| |
| #ifdef CLUSTER_IMMEDIATE_NETIO |
| if (!dead && ((event == EVENT_POLL) || (event == EVENT_INTERVAL))) { |
| if (res >= 0) { |
| build_poll(true); |
| } |
| } |
| #endif |
| return EVENT_CONT; |
| } |
| |
| int |
| ClusterHandler::process_read(ink_hrtime /* now ATS_UNUSED */) |
| { |
| #ifdef CLUSTER_STATS |
| _process_read_calls++; |
| #endif |
| if (dead) { |
| // Node is down |
| return 0; |
| } |
| /////////////////////////////// |
| // Cluster read state machine |
| /////////////////////////////// |
| |
| for (;;) { |
| |
| switch (read.state) { |
| /////////////////////////////////////////////// |
| case ClusterState::READ_START: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_start++; |
| #endif |
| read.msg.clear(); |
| read.start_time = ink_get_hrtime(); |
| if (build_initial_vector(CLUSTER_READ)) { |
| read.state = ClusterState::READ_HEADER; |
| } else { |
| return 0; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_HEADER: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_header++; |
| #endif |
| read.state = ClusterState::READ_AWAIT_HEADER; |
| if (!read.doIO()) { |
| // i/o not initiated, retry later |
| read.state = ClusterState::READ_HEADER; |
| return 0; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_AWAIT_HEADER: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_await_header++; |
| #endif |
| if (!read.io_complete) { |
| return 0; |
| } else { |
| if (read.io_complete < 0) { |
| // read error, declare node down |
| machine_down(); |
| return -1; |
| } |
| } |
| if (read.to_do) { |
| if (read.bytes_xfered) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT); |
| read.state = ClusterState::READ_HEADER; |
| break; |
| } else { |
| // Zero byte read |
| read.state = ClusterState::READ_HEADER; |
| return 0; |
| } |
| } else { |
| #ifdef MSG_TRACE |
| fprintf(t_fd, |
| "[R] seqno=%d count=%d control_bytes=%d count_check=%d dsum=%d csum=%d\n", |
| read.sequence_number, |
| read.msg.hdr()->count, read.msg.hdr()->control_bytes, |
| read.msg.hdr()->count_check, read.msg.hdr()->descriptor_cksum, read.msg.hdr()->control_bytes_cksum); |
| fflush(t_fd); |
| #endif |
| CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did); |
| if (needByteSwap) { |
| read.msg.hdr()->SwapBytes(); |
| } |
| read.msg.count = read.msg.hdr()->count; |
| read.msg.control_bytes = read.msg.hdr()->control_bytes; |
| read.msg.descriptor_cksum = read.msg.hdr()->descriptor_cksum; |
| read.msg.control_bytes_cksum = read.msg.hdr()->control_bytes_cksum; |
| read.msg.unused = read.msg.hdr()->unused; |
| |
| if (MAGIC_COUNT(read) != read.msg.hdr()->count_check) { |
| ink_assert(!"Read bad ClusterMsgHeader data"); |
| Warning("Bad ClusterMsgHeader read on [%d.%d.%d.%d], restarting", DOT_SEPARATED(ip)); |
| Note("Cluster read from [%u.%u.%u.%u] failed, declaring down", DOT_SEPARATED(ip)); |
| machine_down(); |
| return -1; |
| } |
| |
| if (read.msg.count || read.msg.control_bytes) { |
| read.msg.state++; |
| read.state = ClusterState::READ_SETUP_DESCRIPTOR; |
| } else { |
| read.state = ClusterState::READ_COMPLETE; |
| } |
| break; |
| } |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_SETUP_DESCRIPTOR: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_setup_descriptor++; |
| #endif |
| if (build_initial_vector(CLUSTER_READ)) { |
| read.state = ClusterState::READ_DESCRIPTOR; |
| } else { |
| return 0; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_DESCRIPTOR: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_descriptor++; |
| #endif |
| read.state = ClusterState::READ_AWAIT_DESCRIPTOR; |
| if (!read.doIO()) { |
| // i/o not initiated, retry later |
| read.state = ClusterState::READ_DESCRIPTOR; |
| return 0; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_AWAIT_DESCRIPTOR: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_await_descriptor++; |
| #endif |
| if (!read.io_complete) { |
| return 0; |
| } else { |
| if (read.io_complete < 0) { |
| // read error, declare node down |
| machine_down(); |
| return -1; |
| } |
| } |
| if (read.to_do) { |
| if (read.bytes_xfered) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT); |
| read.state = ClusterState::READ_DESCRIPTOR; |
| break; |
| } else { |
| // Zero byte read |
| read.state = ClusterState::READ_DESCRIPTOR; |
| return 0; |
| } |
| } else { |
| #ifdef CLUSTER_MESSAGE_CKSUM |
| ink_release_assert(read.msg.calc_descriptor_cksum() == read.msg.descriptor_cksum); |
| ink_release_assert(read.msg.calc_control_bytes_cksum() == read.msg.control_bytes_cksum); |
| #endif |
| CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.did); |
| if (needByteSwap) { |
| // Descriptors need byte swap |
| swap_descriptor_bytes(); |
| } |
| if (read.msg.count == 0) { |
| read.bytes_xfered = 0; |
| read.state = ClusterState::READ_COMPLETE; |
| } else { |
| read.msg.state++; |
| read.state = ClusterState::READ_SETUP_DATA; |
| } |
| break; |
| } |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_SETUP_DATA: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_setup_data++; |
| #endif |
| if (build_initial_vector(CLUSTER_READ)) { |
| free_locks(CLUSTER_READ); |
| if (read.to_do) { |
| read.state = ClusterState::READ_DATA; |
| } else { |
| // Descriptor contains no VC data |
| read.state = ClusterState::READ_COMPLETE; |
| } |
| } else { |
| return 0; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_DATA: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_data++; |
| #endif |
| ink_release_assert(read.to_do); |
| read.state = ClusterState::READ_AWAIT_DATA; |
| if (!read.doIO()) { |
| // i/o not initiated, retry later |
| read.state = ClusterState::READ_DATA; |
| return 0; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_AWAIT_DATA: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_await_data++; |
| #endif |
| if (!read.io_complete) { |
| return 0; // awaiting i/o complete |
| } else { |
| if (read.io_complete > 0) { |
| read.state = ClusterState::READ_POST_COMPLETE; |
| } else { |
| // read error, declare node down |
| machine_down(); |
| return -1; |
| } |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_POST_COMPLETE: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_post_complete++; |
| #endif |
| if (!get_read_locks()) { |
| return 0; |
| } |
| if (read.to_do) { |
| if (read.bytes_xfered) { |
| update_channels_partial_read(); |
| free_locks(CLUSTER_READ); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered); |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_READS_STAT); |
| read.state = ClusterState::READ_DATA; |
| return 1; |
| } else { |
| // Zero byte read |
| free_locks(CLUSTER_READ); |
| read.state = ClusterState::READ_DATA; |
| return 0; |
| } |
| } else { |
| CLUSTER_SUM_DYN_STAT(CLUSTER_READ_BYTES_STAT, read.bytes_xfered); |
| read.state = ClusterState::READ_COMPLETE; |
| break; |
| } |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::READ_COMPLETE: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_read_complete++; |
| #endif |
| ink_hrtime rdmsg_end_time = ink_get_hrtime(); |
| CLUSTER_SUM_DYN_STAT(CLUSTER_RDMSG_ASSEMBLE_TIME_STAT, rdmsg_end_time - read.start_time); |
| read.start_time = HRTIME_MSECONDS(0); |
| if (dump_msgs) |
| dump_read_msg(); |
| read.sequence_number++; |
| update_channels_read(); |
| free_locks(CLUSTER_READ); |
| |
| read.state = ClusterState::READ_START; |
| break; // setup next read |
| } |
| ////////////////// |
| default: |
| ////////////////// |
| { |
| ink_release_assert(!"ClusterHandler::process_read invalid state"); |
| } |
| |
| } // end of switch |
| } // end of for |
| } |
| |
| int |
| ClusterHandler::process_write(ink_hrtime now, bool only_write_control_msgs) |
| { |
| #ifdef CLUSTER_STATS |
| _process_write_calls++; |
| #endif |
| ///////////////////////////////// |
| // Cluster write state machine |
| ///////////////////////////////// |
| for (;;) { |
| |
| switch (write.state) { |
| /////////////////////////////////////////////// |
| case ClusterState::WRITE_START: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_write_start++; |
| #endif |
| write.msg.clear(); |
| write.last_time = ink_get_hrtime(); |
| pw_write_descriptors_built = -1; |
| pw_freespace_descriptors_built = -1; |
| pw_controldata_descriptors_built = -1; |
| pw_time_expired = 0; |
| write.state = ClusterState::WRITE_SETUP; |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::WRITE_SETUP: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_write_setup++; |
| #endif |
| if (!on_stolen_thread && !only_write_control_msgs) { |
| ///////////////////////////////////////////////////////////// |
| // Build a complete write descriptor containing control, |
| // data and freespace message data. |
| ///////////////////////////////////////////////////////////// |
| |
| // Control message descriptors |
| if (pw_controldata_descriptors_built) { |
| pw_controldata_descriptors_built = build_controlmsg_descriptors(); |
| } |
| // Write data descriptors |
| if (pw_write_descriptors_built) { |
| pw_write_descriptors_built = build_write_descriptors(); |
| } |
| // Free space descriptors |
| if (pw_freespace_descriptors_built) { |
| pw_freespace_descriptors_built = build_freespace_descriptors(); |
| } |
| add_small_controlmsg_descriptors(); // always last |
| } else { |
| ///////////////////////////////////////////////////////////// |
| // Build a write descriptor only containing control data. |
| ///////////////////////////////////////////////////////////// |
| pw_write_descriptors_built = 0; |
| pw_freespace_descriptors_built = 0; |
| pw_controldata_descriptors_built = build_controlmsg_descriptors(); |
| add_small_controlmsg_descriptors(); // always last |
| } |
| |
| // If nothing to write, post write completion |
| if (!pw_controldata_descriptors_built && !pw_write_descriptors_built && !pw_freespace_descriptors_built) { |
| write.state = ClusterState::WRITE_COMPLETE; |
| break; |
| } else { |
| started_on_stolen_thread = on_stolen_thread; |
| control_message_write = only_write_control_msgs; |
| } |
| |
| // Move required data into the message header |
| #ifdef CLUSTER_MESSAGE_CKSUM |
| write.msg.descriptor_cksum = write.msg.calc_descriptor_cksum(); |
| write.msg.hdr()->descriptor_cksum = write.msg.descriptor_cksum; |
| |
| write.msg.control_bytes_cksum = write.msg.calc_control_bytes_cksum(); |
| write.msg.hdr()->control_bytes_cksum = write.msg.control_bytes_cksum; |
| write.msg.unused = 0; |
| #endif |
| write.msg.hdr()->count = write.msg.count; |
| write.msg.hdr()->control_bytes = write.msg.control_bytes; |
| write.msg.hdr()->count_check = MAGIC_COUNT(write); |
| |
| ink_release_assert(build_initial_vector(CLUSTER_WRITE)); |
| free_locks(CLUSTER_WRITE); |
| write.state = ClusterState::WRITE_INITIATE; |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::WRITE_INITIATE: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_write_initiate++; |
| #endif |
| write.state = ClusterState::WRITE_AWAIT_COMPLETION; |
| if (!write.doIO()) { |
| // i/o not initiated, retry later |
| write.state = ClusterState::WRITE_INITIATE; |
| return 0; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::WRITE_AWAIT_COMPLETION: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_write_await_completion++; |
| #endif |
| if (!write.io_complete) { |
| // Still waiting for write i/o completion |
| return 0; |
| } else { |
| if (write.io_complete < 0) { |
| // write error, declare node down |
| machine_down(); |
| write.state = ClusterState::WRITE_INITIATE; |
| break; |
| } |
| if (write.to_do) { |
| if (write.bytes_xfered) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_PARTIAL_WRITES_STAT); |
| write.state = ClusterState::WRITE_INITIATE; |
| break; |
| } else { |
| // Zero byte write |
| write.state = ClusterState::WRITE_INITIATE; |
| return 0; |
| } |
| } |
| CLUSTER_SUM_DYN_STAT(CLUSTER_WRITE_BYTES_STAT, write.bytes_xfered); |
| write.sequence_number++; |
| write.state = ClusterState::WRITE_POST_COMPLETE; |
| } |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::WRITE_POST_COMPLETE: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_write_post_complete++; |
| #endif |
| if (!get_write_locks()) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_WRITE_LOCK_MISSES_STAT); |
| return 0; |
| } |
| // |
| // Move the channels into their new buckets based on how much |
| // was written |
| // |
| update_channels_written(); |
| free_locks(CLUSTER_WRITE); |
| write.state = ClusterState::WRITE_COMPLETE; |
| break; |
| } |
| /////////////////////////////////////////////// |
| case ClusterState::WRITE_COMPLETE: |
| /////////////////////////////////////////////// |
| { |
| #ifdef CLUSTER_STATS |
| _n_write_complete++; |
| #endif |
| write.state = ClusterState::WRITE_START; |
| ink_hrtime curtime = ink_get_hrtime(); |
| |
| if (!on_stolen_thread) { |
| // |
| // Complete all work in the current bucket before moving to next |
| // |
| pw_time_expired = (curtime - now) > CLUSTER_MAX_RUN_TIME; |
| |
| if (!control_message_write && !pw_write_descriptors_built |
| && !pw_freespace_descriptors_built && !pw_controldata_descriptors_built) { |
| // skip to the next bucket |
| cur_vcs = (cur_vcs + 1) % CLUSTER_BUCKETS; |
| } |
| } else { |
| // |
| // Place an upper bound on thread stealing |
| // |
| pw_time_expired = (curtime - now) > CLUSTER_MAX_THREAD_STEAL_TIME; |
| if (pw_time_expired) { |
| CLUSTER_INCREMENT_DYN_STAT(CLUSTER_THREAD_STEAL_EXPIRES_STAT); |
| } |
| } |
| // |
| // periodic activities |
| // |
| if (!on_stolen_thread && !cur_vcs && !dead) { |
| // |
| // check if this machine is supposed to be in the cluster |
| // |
| MachineList *mc = the_cluster_machines_config(); |
| if (mc && !mc->find(ip, port)) { |
| Note("Cluster [%u.%u.%u.%u:%d] not in config, declaring down", DOT_SEPARATED(ip), port); |
| machine_down(); |
| } |
| } |
| if (pw_time_expired) { |
| return -1; // thread run time expired |
| } else { |
| if (pw_write_descriptors_built || pw_freespace_descriptors_built || pw_controldata_descriptors_built) { |
| break; // start another write |
| } else { |
| return 0; // no more data to write |
| } |
| } |
| } |
| ////////////////// |
| default: |
| ////////////////// |
| { |
| ink_release_assert(!"ClusterHandler::process_write invalid state"); |
| } |
| |
| } // End of switch |
| } // End of for |
| } |
| |
| int |
| ClusterHandler::do_open_local_requests() |
| { |
| // |
| // open_local requests which are unable to obtain the ClusterHandler |
| // mutex are deferred and placed onto external_incoming_open_local queue. |
| // It is here where we process the open_local requests within the |
| // ET_CLUSTER thread. |
| // |
| int pending_request = 0; |
| ClusterVConnection *cvc; |
| ClusterVConnection *cvc_ext; |
| ClusterVConnection *cvc_ext_next; |
| EThread *tt = this_ethread(); |
| Queue<ClusterVConnection> local_incoming_open_local; |
| |
| // |
| // Atomically dequeue all requests from the external queue and |
| // move them to the local working queue while maintaining insertion order. |
| // |
| while (true) { |
| cvc_ext = (ClusterVConnection *) |
| ink_atomiclist_popall(&external_incoming_open_local); |
| if (cvc_ext == 0) |
| break; |
| |
| while (cvc_ext) { |
| cvc_ext_next = (ClusterVConnection *) cvc_ext->link.next; |
| cvc_ext->link.next = NULL; |
| local_incoming_open_local.push(cvc_ext); |
| cvc_ext = cvc_ext_next; |
| } |
| |
| // Process deferred open_local requests. |
| |
| while ((cvc = local_incoming_open_local.pop())) { |
| MUTEX_TRY_LOCK(lock, cvc->action_.mutex, tt); |
| if (lock) { |
| if (cvc->start(tt) < 0) { |
| cvc->token.clear(); |
| if (cvc->action_.continuation) { |
| cvc->action_.continuation->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0); |
| clusterVCAllocator.free(cvc); |
| } |
| } |
| MUTEX_RELEASE(lock); |
| |
| } else { |
| // unable to get mutex, insert request back onto global queue. |
| Debug(CL_TRACE, "do_open_local_requests() unable to acquire mutex (cvc=%p)", cvc); |
| pending_request = 1; |
| ink_atomiclist_push(&external_incoming_open_local, (void *) cvc); |
| } |
| } |
| } |
| return pending_request; |
| } |
| |
| // End of ClusterHandler.cc |