| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| /**************************************************************************** |
| |
| MuxVC.cc |
| |
| Description: |
| |
| |
| ****************************************************************************/ |
| |
| #include "MuxVC.h" |
| #include "HttpAccept.h" |
| #include "Main.h" |
| #include "NetVCTest.h" |
| #include "StatPages.h" |
| |
| /* For stat pages */ |
| #include "UnixNet.h" |
| |
| const int MUX_LOCK_RETRY = HRTIME_MSECONDS(10); |
| const int MUX_MAX_DATA_SIZE = USHRT_MAX - sizeof(MuxMessage); |
| |
| #define MIN(x,y) (x <= y) ? x : y; |
| #define MAX(x,y) (x >= y) ? x : y; |
| |
| #define MUX_MAX_BYTES_SLOT 32768 |
| #define MUX_MAX_BYTES_BANK 32768 |
| #define MUX_SMALL_BLOCK_SIZE 256 |
| #define MUX_WRITE_HIGH_WATER (MUX_MAX_BYTES_SLOT * 4) |
| |
| void mux_pages_init(); |
| |
| static const char * |
| control_msg_id_to_string(int msg_type) |
| { |
| |
| switch (msg_type) { |
| case INKMUX_MSG_OPEN_CHANNEL: |
| return "INKMUX_MSG_OPEN_CHANNEL"; |
| case INKMUX_MSG_CLOSE_CHANNEL: |
| return "INKMUX_MSG_CLOSE_CHANNEL"; |
| case INKMUX_MSG_SHUTDOWN_WRITE: |
| return "INKMUX_MSG_SHUTDOWN_WRITE"; |
| case INKMUX_MSG_NORMAL_DATA: |
| return "INKMUX_MSG_NORMAL_DATA"; |
| case INKMUX_MSG_OOB_DATA: |
| return "INKMUX_MSG_OOB_DATA"; |
| case INKMUX_MSG_CHANNEL_RESET: |
| return "INKMUX_MSG_CHANNEL_RESET"; |
| case INKMUX_MSG_FLOW_CONTROL_START: |
| return "INKMUX_MSG_FLOW_CONTROL_START"; |
| case INKMUX_MSG_FLOW_CONTROL_STOP: |
| return "INKMUX_MSG_FLOW_CONTROL_STOP"; |
| default: |
| "INKMUX_MSG_UNKNOWN"; |
| } |
| } |
| |
| // static void mux_move_data(MIOBuffer* copy_to, IOBufferReader* from, int nbytes) |
| // |
| // Utility routine used to move data from a IOBufferReader to a MIOBuffer |
| // If the amount of data is large enough we want to move it by reference. |
| // If it's small, small blocks are problematic so copy the data |
| // |
| static void |
| mux_move_data(MIOBuffer * copy_to, IOBufferReader * from, int nbytes) |
| { |
| |
| if (0 && nbytes > MUX_SMALL_BLOCK_SIZE) { |
| copy_to->write(from, nbytes); |
| from->consume(nbytes); |
| } else { |
| int left = nbytes; |
| |
| while (left > 0) { |
| char *block_start = from->start(); |
| int64_t block_avail = from->block_read_avail(); |
| int act_on = MIN(block_avail, left); |
| int r = copy_to->write(block_start, act_on); |
| ink_debug_assert(r == act_on); |
| from->consume(act_on); |
| left -= act_on; |
| } |
| } |
| } |
| |
| MuxClientState::MuxClientState(): |
| vio(), shutdown(false), enabled(0), flow_stopped(0) // parent MuxVC lock |
| { |
| }; |
| |
| MuxClientVC::MuxClientVC(): |
| link(), |
| id(-1), |
| magic(MUX_VC_CLIENT_MAGIC_ALIVE), |
| closed(false), |
| other_side_closed(0), |
| reentrancy_count(0), |
| need_boost(true), |
| mux_vc(NULL), |
| read_state(), |
| write_state(), |
| read_byte_bank(NULL), |
| byte_bank_reader(NULL), |
| active_timeout(0), inactive_timeout(0), active_event(NULL), inactive_event(NULL), retry_event(NULL), NetVConnection() |
| { |
| SET_HANDLER(MuxClientVC::main_handler); |
| } |
| |
| MuxClientVC::~MuxClientVC() |
| { |
| } |
| |
| void |
| MuxClientVC::init(MuxVC * mvc, int32_t id_arg) |
| { |
| |
| ink_debug_assert(!closed); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| mux_vc = mvc; |
| mutex = mux_vc->mutex; |
| |
| id = id_arg; |
| } |
| |
| // void MuxClientVC::kill() |
| // |
| // Cleans up and deallocates. |
| // |
| // Callee MUST be hold this->mutex & |
| // must have already remove this MuxClientVC |
| // from it's parent's vc list |
| // |
| void |
| MuxClientVC::kill() |
| { |
| |
| ink_debug_assert(closed == true); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| ink_debug_assert(mutex->thread_holding == this_ethread()); |
| |
| Debug("mux_alloc", "[%d,%d] Killing client id", mux_vc->id, id); |
| |
| magic = MUX_VC_CLIENT_MAGIC_DEAD; |
| |
| if (read_byte_bank) { |
| free_MIOBuffer(read_byte_bank); |
| read_byte_bank = NULL; |
| byte_bank_reader = NULL; |
| } |
| |
| if (active_event != NULL) { |
| active_event->cancel(); |
| active_event = NULL; |
| } |
| |
| if (inactive_event != NULL) { |
| inactive_event->cancel(); |
| inactive_event = NULL; |
| } |
| |
| if (retry_event != NULL) { |
| retry_event->cancel(); |
| retry_event = NULL; |
| } |
| |
| mux_vc = NULL; |
| |
| read_state.vio, mutex = NULL; |
| write_state.vio.mutex = NULL; |
| mutex = NULL; |
| |
| delete this; |
| } |
| |
| |
| VIO * |
| MuxClientVC::do_io_read(Continuation * c, int64_t nbytes, MIOBuffer * buf) |
| { |
| |
| ink_debug_assert(!closed); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| if (read_state.vio.op == VIO::READ) { |
| Debug("mux_last", "do_io_read over nbytes %d ndone %d byte_bank %d", |
| read_state.vio.nbytes, read_state.vio.ndone, (byte_bank_reader == NULL) ? 0 : byte_bank_reader->read_avail()); |
| } |
| |
| if (buf) { |
| read_state.vio.buffer.writer_for(buf); |
| read_state.enabled = 1; |
| } else { |
| read_state.vio.buffer.clear(); |
| read_state.enabled = 0; |
| } |
| |
| read_state.vio.op = VIO::READ; |
| read_state.vio.mutex = c->mutex; |
| read_state.vio._cont = c; |
| read_state.vio.nbytes = nbytes; |
| read_state.vio.data = 0; |
| read_state.vio.ndone = 0; |
| read_state.vio.vc_server = (VConnection *) this; |
| |
| Debug("muxvc", "[%d,%d] do_io_read for %d bytes", mux_vc->id, id, nbytes); |
| |
| if (other_side_closed & MUX_OCLOSE_INBOUND_MASK) { |
| other_side_closed |= MUX_OCLOSE_NEED_READ_NOTIFY; |
| } |
| |
| setup_retry_event(0); |
| |
| return &read_state.vio; |
| } |
| |
| VIO * |
| MuxClientVC::do_io_write(Continuation * c, int64_t nbytes, IOBufferReader * abuffer, bool owner) |
| { |
| |
| ink_debug_assert(!closed); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| ink_debug_assert(owner == false); |
| |
| if (abuffer) { |
| ink_assert(!owner); |
| write_state.vio.buffer.reader_for(abuffer); |
| write_state.enabled = 1; |
| } else { |
| write_state.vio.buffer.clear(); |
| write_state.enabled = 0; |
| } |
| |
| write_state.vio.op = VIO::WRITE; |
| write_state.vio.mutex = c->mutex; |
| write_state.vio._cont = c; |
| write_state.vio.nbytes = nbytes; |
| write_state.vio.data = 0; |
| write_state.vio.ndone = 0; |
| write_state.vio.vc_server = (VConnection *) this; |
| |
| Debug("muxvc", "[%d,%d] do_io_write for %d bytes", mux_vc->id, id, nbytes); |
| |
| if (other_side_closed & MUX_OCLOSE_OUTBOUND_MASK) { |
| other_side_closed |= MUX_OCLOSE_NEED_WRITE_NOTIFY; |
| } |
| |
| setup_retry_event(0); |
| |
| return &write_state.vio; |
| } |
| |
| void |
| MuxClientVC::reenable(VIO * vio) |
| { |
| |
| ink_debug_assert(!closed); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| Debug("muxvc", "[%d,%d] MuxClientVC::reenable %s", mux_vc->id, id, (vio->op == VIO::WRITE) ? "Write" : "Read"); |
| |
| if (vio == &read_state.vio) { |
| ink_debug_assert(vio->op == VIO::READ); |
| read_state.enabled = 1; |
| } else { |
| ink_debug_assert(vio == &write_state.vio); |
| ink_debug_assert(vio->op == VIO::WRITE); |
| write_state.enabled = 1; |
| } |
| |
| // We need to be running with MuxVC lock and |
| // on a different call stack so reschedule ourselves |
| setup_retry_event(0); |
| } |
| |
| void |
| MuxClientVC::reenable_re(VIO * vio) |
| { |
| this->reenable(vio); |
| /* |
| ink_debug_assert(!closed); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| Debug("muxvc", "[%d] reenable_re %s", id, |
| (vio->op == VIO::WRITE) ? "Write" : "Read"); |
| */ |
| } |
| |
| void |
| MuxClientVC::boost() |
| { |
| |
| // We need to get lock for netVC to boost it |
| MUTEX_TRY_LOCK(lock, mux_vc->mutex, this_ethread()); |
| if (lock) { |
| if (mux_vc->net_vc) { |
| mux_vc->net_vc->boost(); |
| } |
| } else { |
| need_boost = true; |
| setup_retry_event(10); |
| } |
| } |
| |
| void |
| MuxClientVC::do_io_close(int flag) |
| { |
| |
| ink_debug_assert(closed == false); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| Debug("muxvc", "[%d, %d] do_io_close", mux_vc->id, id); |
| |
| closed = true; |
| |
| read_state.enabled = 0; |
| read_state.vio.buffer.clear(); |
| read_state.vio.nbytes = 0; |
| |
| write_state.enabled = 0; |
| write_state.vio.buffer.clear(); |
| write_state.vio.nbytes = 0; |
| |
| // If we get the do_io_close() on a callout, we |
| // must defer processing till the callout completes |
| if (reentrancy_count != 0) { |
| return; |
| } |
| // Now we need to try remove ourselves from the |
| // the parent MuxVC. |
| MUTEX_TRY_LOCK(lock, mux_vc->mutex, this_ethread()); |
| if (lock) { |
| mux_vc->remove_client(this); |
| } else { |
| setup_retry_event(10); |
| } |
| } |
| |
| // void MuxClientVC::do_io_shutdown(ShutdownHowTo_t howto) |
| // |
| // With read side shutdown, we don't need to send |
| // any control messages since a read shutdown indicates |
| // we just need to discard data received |
| // |
| // Write shutdowns reuqire us to inform the other side |
| // that we are finished sending data so anyone doing |
| // a read will get an EOS |
| // |
| // See: "UNIX Network Programming, Vol 1, Second |
| // Edition" by Stevens, Section 6.6 for |
| // a description of shutdown behavior with |
| // regular sockets |
| // |
| void |
| MuxClientVC::do_io_shutdown(ShutdownHowTo_t howto) |
| { |
| |
| ink_debug_assert(closed == false); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| switch (howto) { |
| case IO_SHUTDOWN_READ: |
| read_state.shutdown = 1; |
| read_state.enabled = 0; |
| break; |
| case IO_SHUTDOWN_READWRITE: |
| read_state.shutdown = 1; |
| read_state.enabled = 0; |
| // FALL THROUGH |
| case IO_SHUTDOWN_WRITE: |
| { |
| write_state.shutdown = (MUX_WRITE_SHUTDOWN | MUX_WRITE_SHUTUDOWN_SEND_MSG); |
| write_state.enabled = 0; |
| setup_retry_event(0); |
| break; |
| } |
| } |
| |
| Debug("muxvc", "[%d,%d] do_io_shutdown %d", mux_vc->id, id, (int) howto); |
| } |
| |
| void |
| MuxClientVC::set_active_timeout(ink_hrtime timeout_in) |
| { |
| |
| ink_debug_assert(closed == false); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| active_timeout = timeout_in; |
| |
| // FIX - Do we need to handle the case where the timeout is set |
| // but no io has been done? |
| // |
| // FIX - Locking? |
| // |
| if (active_event) { |
| ink_assert(!active_event->cancelled); |
| active_event->cancel(); |
| active_event = NULL; |
| } |
| |
| if (active_timeout > 0) { |
| active_event = eventProcessor.schedule_in(this, active_timeout); |
| } |
| } |
| |
| void |
| MuxClientVC::set_inactivity_timeout(ink_hrtime timeout_in) |
| { |
| |
| ink_debug_assert(closed == false); |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| inactive_timeout = timeout_in; |
| |
| // FIX - Do we need to handle the case where the timeout is set |
| // but no io has been done? |
| // |
| // FIX - Locking? |
| // |
| if (inactive_event) { |
| ink_assert(!inactive_event->cancelled); |
| inactive_event->cancel(); |
| inactive_event = NULL; |
| } |
| |
| if (inactive_timeout > 0) { |
| inactive_event = eventProcessor.schedule_in(this, inactive_timeout); |
| } |
| } |
| |
| void |
| MuxClientVC::cancel_active_timeout() |
| { |
| set_active_timeout(0); |
| } |
| |
| void |
| MuxClientVC::cancel_inactivity_timeout() |
| { |
| set_inactivity_timeout(0); |
| } |
| |
| ink_hrtime |
| MuxClientVC::get_active_timeout() |
| { |
| return active_timeout; |
| } |
| |
| ink_hrtime |
| MuxClientVC::get_inactivity_timeout() |
| { |
| return inactive_timeout; |
| } |
| |
| void |
| MuxClientVC::update_inactive_timeout() |
| { |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| if (inactive_event) { |
| inactive_event->cancel(); |
| inactive_event = eventProcessor.schedule_in(this, inactive_timeout); |
| } |
| } |
| |
| SOCKET |
| MuxClientVC::get_socket() |
| { |
| return 0; |
| } |
| |
| const struct sockaddr_in & |
| MuxClientVC::get_local_addr() |
| { |
| return mux_vc->local_addr; |
| } |
| |
| const struct sockaddr_in & |
| MuxClientVC::get_remote_addr() |
| { |
| return mux_vc->remote_addr; |
| } |
| |
| unsigned int |
| MuxClientVC::get_local_ip() |
| { |
| return (unsigned int) get_local_addr().sin_addr.s_addr; |
| } |
| |
| int |
| MuxClientVC::get_local_port() |
| { |
| return ntohs(get_local_addr().sin_port); |
| } |
| |
| unsigned int |
| MuxClientVC::get_remote_ip() |
| { |
| return (unsigned int) get_remote_addr().sin_addr.s_addr; |
| } |
| |
| int |
| MuxClientVC::get_remote_port() |
| { |
| return ntohs(get_remote_addr().sin_port);; |
| } |
| |
| int |
| MuxClientVC::main_handler(int event, void *data) |
| { |
| |
| Debug("muxvc", "[%d,%d] client main_hander %d 0x%x", mux_vc->id, id, event, data); |
| |
| ink_release_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| ink_release_assert(event == EVENT_INTERVAL || event == EVENT_IMMEDIATE); |
| |
| Event *calling_event = (Event *) data; |
| EThread *my_ethread = this_ethread(); |
| bool read_mutex_held = false; |
| bool write_mutex_held = false; |
| Ptr<ProxyMutex> read_side_mutex = read_state.vio.mutex; |
| Ptr<ProxyMutex> write_side_mutex = write_state.vio.mutex; |
| |
| ink_debug_assert(mutex->thread_holding == my_ethread); |
| |
| if (read_side_mutex) { |
| read_mutex_held = MUTEX_TAKE_TRY_LOCK(read_side_mutex, my_ethread); |
| |
| if (!read_mutex_held) { |
| calling_event->schedule_in(MUX_LOCK_RETRY); |
| return 0; |
| } |
| } |
| |
| if (write_side_mutex) { |
| write_mutex_held = MUTEX_TAKE_TRY_LOCK(write_side_mutex, my_ethread); |
| |
| if (!write_mutex_held) { |
| if (read_mutex_held) { |
| Mutex_unlock(read_side_mutex, my_ethread); |
| calling_event->schedule_in(MUX_LOCK_RETRY); |
| return 0; |
| } |
| } |
| } |
| // At this point, we hold the lock for MuxVC (since it's shares the lock with |
| // all it's MuxClientVC's and we've got the state machine locks for both sides |
| // of the connection so we should be have exclusive access all the data structures |
| // and be free from annoying and reenables, do_io calls, etc. |
| if (data == active_event) { |
| active_event = NULL; |
| process_timeout(VC_EVENT_ACTIVE_TIMEOUT); |
| } else if (data == inactive_event) { |
| inactive_event = NULL; |
| process_timeout(VC_EVENT_INACTIVITY_TIMEOUT); |
| } else { |
| ink_release_assert(data == retry_event); |
| retry_event = NULL; |
| process_retry_event(); |
| } |
| |
| if (read_mutex_held) { |
| Mutex_unlock(read_side_mutex, my_ethread); |
| } |
| |
| if (write_mutex_held) { |
| Mutex_unlock(write_side_mutex, my_ethread); |
| } |
| |
| return 0; |
| } |
| |
| |
| // void MuxClientVC::process_timeout(int event_to_send) |
| // |
| // Sends timeouts. All the locks were already taken |
| // by main_handler |
| // |
| void |
| MuxClientVC::process_timeout(int event_to_send) |
| { |
| |
| ink_debug_assert(magic == MUX_VC_CLIENT_MAGIC_ALIVE); |
| |
| Debug("muxvc", "[%d,%d] process_timeout - event_to_send %d", mux_vc->id, id, event_to_send); |
| |
| if (closed) { |
| return; |
| } |
| |
| if (read_state.vio.op == VIO::READ && !read_state.shutdown && read_state.vio.ntodo() > 0) { |
| read_state.vio._cont->handleEvent(event_to_send, &read_state.vio); |
| } else if (write_state.vio.op == VIO::WRITE && !write_state.shutdown && write_state.vio.ntodo() > 0) { |
| write_state.vio._cont->handleEvent(event_to_send, &write_state.vio); |
| } |
| } |
| |
| |
| // void MuxClientVC::setup_retry_event(int ms) |
| // |
| // Sets up an event to this client for processing. Retry events |
| // can only be sent while holding the user sm's lock, pointed to by the |
| // vios |
| // |
| void |
| MuxClientVC::setup_retry_event(int ms) |
| { |
| if (!retry_event) { |
| if (ms > 0) { |
| retry_event = eventProcessor.schedule_in(this, HRTIME_MSECONDS(ms)); |
| } else { |
| retry_event = eventProcessor.schedule_imm(this); |
| } |
| } |
| } |
| |
| // void MuxClientVC::process_retry_event() |
| // |
| // We've gotten this event because we missed a lock or needed to do something |
| // on a different callstack. We need to figure out our state and act |
| // appropriately |
| // |
| void |
| MuxClientVC::process_retry_event() |
| { |
| |
| int bytes_written_to_mux = 0; |
| Debug("muxvc", "[%d,%d] process_retry_event", mux_vc->id, id); |
| |
| if (closed) { |
| // We missed the lock on the MuxVC during do_io_close(). |
| // This time the callee has gotten the lock for us |
| mux_vc->remove_client(this); |
| return; |
| } |
| |
| if (write_state.shutdown & MUX_WRITE_SHUTUDOWN_SEND_MSG) { |
| bytes_written_to_mux += send_write_shutdown_message(); |
| } |
| |
| if (need_boost) { |
| if (mux_vc->net_vc) { |
| mux_vc->net_vc->boost(); |
| } |
| need_boost = false; |
| } |
| |
| if (read_state.enabled) { |
| process_read_state(); |
| |
| if (closed) { |
| mux_vc->remove_client(this); |
| return; |
| } |
| } |
| |
| if (write_state.enabled) { |
| bytes_written_to_mux += process_write(); |
| |
| if (closed) { |
| mux_vc->remove_client(this); |
| return; |
| } |
| } |
| |
| if (bytes_written_to_mux > 0) { |
| if (mux_vc->write_vio && mux_vc->connect_state != MUX_CONNECTION_DROPPED) { |
| mux_vc->write_vio->reenable(); |
| } |
| } |
| } |
| |
| |
| void |
| MuxClientVC::process_read_state() |
| { |
| |
| ink_debug_assert(read_state.vio.mutex->thread_holding == this_ethread()); |
| ink_debug_assert(read_state.enabled); |
| |
| if (read_byte_bank) { |
| this->process_byte_bank(); |
| if (closed) { |
| return; |
| } |
| |
| if (read_byte_bank) { |
| Warning("Byte bank remains"); |
| } |
| } |
| // FIX - MUST ALSO CHECK FOR SHUTDOWN MSG |
| if (other_side_closed & MUX_OCLOSE_INBOUND_MASK) { |
| if (other_side_closed & MUX_OCLOSE_NEED_READ_NOTIFY && read_byte_bank == NULL) { |
| this->process_channel_close_for_read(); |
| |
| if (closed) { |
| return; |
| } |
| } |
| } else if (read_state.flow_stopped) { |
| // If the client's buffer is not full & wants more bytes, |
| // unset flow control |
| /*if (client->read_state.flow_stopped && |
| client->read_state.vio.ntodo() > 0 && |
| !client->read_state.vio.buffer.writer()->high_water()) { |
| client->read_state.flow_stopped = 0; |
| enqueue_control_message(INKMUX_MSG_FLOW_CONTROL_STOP, client->id); |
| } */ |
| |
| } |
| } |
| |
| // void MuxClientVC::process_byte_bank() |
| // |
| // Transfers bytes from the byte bank to the client |
| // read buffer |
| // |
| // CALLER must have take LOCK for client read side's |
| // VIO |
| // |
| // CALLER is responsible for handling reeentrany closes |
| // |
| int |
| MuxClientVC::process_byte_bank() |
| { |
| |
| int64_t bank_avail = byte_bank_reader->read_avail(); |
| int64_t vio_todo = read_state.vio.ntodotodo(); |
| int act_on = MIN(bank_avail, vio_todo); |
| |
| if (act_on > 0) { |
| |
| mux_move_data(read_state.vio.buffer.writer(), byte_bank_reader, act_on); |
| |
| bank_avail -= act_on; |
| |
| if (bank_avail == 0) { |
| free_MIOBuffer(read_byte_bank); |
| read_byte_bank = NULL; |
| byte_bank_reader = NULL; |
| } |
| |
| read_state.vio.ndone += act_on; |
| |
| int event; |
| if (read_state.vio.ntodo() == 0) { |
| event = VC_EVENT_READ_COMPLETE; |
| } else { |
| event = VC_EVENT_READ_READY; |
| } |
| |
| reentrancy_count++; |
| read_state.vio._cont->handleEvent(event, &read_state.vio); |
| reentrancy_count--; |
| } |
| |
| return bank_avail; |
| } |
| |
| |
| int |
| MuxClientVC::process_write() |
| { |
| int bytes_written = 0; |
| |
| ink_debug_assert(write_state.vio.mutex->thread_holding == this_ethread()); |
| ink_debug_assert(write_state.enabled); |
| |
| if (other_side_closed & MUX_OCLOSE_OUTBOUND_MASK) { |
| if (other_side_closed & MUX_OCLOSE_NEED_WRITE_NOTIFY) { |
| process_channel_close_for_write(); |
| } |
| return 0; |
| } else { |
| |
| ink_debug_assert(!closed); |
| |
| int64_t ntodo = write_state.vio.ntodo(); |
| if (ntodo == 0 || write_state.shutdown) { |
| write_state.enabled = 0; |
| return 0; |
| } |
| |
| int avail = write_state.vio.buffer.reader()->read_avail(); |
| int act_on = MIN(ntodo, avail); |
| |
| // FIX ME - if we don't send all the data in the buffer |
| // the connection can lock up because the producer |
| // expects all the data to be removed from the buffer |
| // if the watermark is 0 |
| // |
| // But by not limiting it, we allow one transaction |
| // to starve others |
| // |
| // act_on = MIN(act_on, MUX_MAX_BYTES_SLOT); |
| |
| ink_debug_assert(act_on >= 0); |
| if (act_on <= 0) { |
| Debug("muxvc", "[process_write] disabling [%d,%d]" " due to zero bytes", mux_vc->id, id); |
| write_state.enabled = 0; |
| |
| // Notify the client we're disabling it due to lack of data |
| reentrancy_count++; |
| write_state.vio._cont->handleEvent(VC_EVENT_WRITE_READY, &write_state.vio); |
| reentrancy_count--; |
| |
| return 0; |
| } |
| // If we've got too much data outstanding in the write buffer, |
| // don't add any more |
| if (mux_vc->write_high_water()) { |
| mux_vc->writes_blocked = true; |
| return 0; |
| } |
| |
| int left = act_on; |
| |
| while (left > 0) { |
| int msg_bytes = MIN(left, MUX_MAX_DATA_SIZE); |
| bytes_written += mux_vc->enqueue_control_message(INKMUX_MSG_NORMAL_DATA, id, msg_bytes); |
| mux_move_data(mux_vc->write_buffer, write_state.vio.buffer.reader(), msg_bytes); |
| left -= msg_bytes; |
| } |
| |
| write_state.vio.ndone += act_on; |
| update_inactive_timeout(); |
| |
| Debug("muxvc", "[process_write] callback for [%d,%d]" |
| " ndone %d, nbytes %d", mux_vc->id, id, write_state.vio.ndone, write_state.vio.nbytes); |
| |
| int event; |
| if (write_state.vio.ntodo() == 0) { |
| write_state.enabled = 0; |
| event = VC_EVENT_WRITE_COMPLETE; |
| } else { |
| event = VC_EVENT_WRITE_READY; |
| } |
| |
| reentrancy_count++; |
| write_state.vio._cont->handleEvent(event, &write_state.vio); |
| reentrancy_count--; |
| |
| } |
| |
| // FIX - WHAT ABOUT PENDING SHUTDOWN MESSAGE! |
| |
| return bytes_written; |
| } |
| |
| |
| // void MuxClientVC::process_channel_close_for_read() |
| // |
| // Handles sending EOS to the read side of the client |
| // when the the remote side closes the channel |
| // |
| // CALLER is responsible for handling reentrant closes |
| // |
| void |
| MuxClientVC::process_channel_close_for_read() |
| { |
| |
| ink_debug_assert(!closed); |
| ink_debug_assert(other_side_closed & MUX_OCLOSE_NEED_READ_NOTIFY); |
| ink_debug_assert(read_state.vio.mutex->thread_holding == this_ethread()); |
| ink_debug_assert(read_byte_bank == NULL); |
| |
| if (!read_state.shutdown && read_state.vio.ntodo() > 0) { |
| |
| other_side_closed &= ~MUX_OCLOSE_NEED_READ_NOTIFY; |
| |
| reentrancy_count++; |
| read_state.vio._cont->handleEvent(VC_EVENT_EOS, &read_state.vio); |
| reentrancy_count--; |
| } |
| } |
| |
| // void MuxClientVC::process_channel_close_for_write() |
| // |
| // Handles sending ERROR to the write side of the client |
| // when the the remote side closes the channel |
| // |
| // CALLER is responsible for handling reentrant closes |
| // |
| void |
| MuxClientVC::process_channel_close_for_write() |
| { |
| |
| ink_debug_assert(!closed); |
| ink_debug_assert(other_side_closed & MUX_OCLOSE_NEED_WRITE_NOTIFY); |
| ink_debug_assert(write_state.vio.mutex->thread_holding == this_ethread()); |
| |
| if (!write_state.shutdown && write_state.vio.ntodo() > 0) { |
| |
| other_side_closed &= ~MUX_OCLOSE_NEED_WRITE_NOTIFY; |
| |
| reentrancy_count++; |
| write_state.vio._cont->handleEvent(VC_EVENT_ERROR, &write_state.vio); |
| reentrancy_count--; |
| } |
| } |
| |
| int |
| MuxClientVC::send_write_shutdown_message() |
| { |
| |
| ink_debug_assert(!closed); |
| ink_debug_assert(write_state.shutdown & MUX_WRITE_SHUTUDOWN_SEND_MSG); |
| ink_debug_assert(this->mutex->thread_holding == this_ethread()); |
| |
| write_state.shutdown &= ~MUX_WRITE_SHUTUDOWN_SEND_MSG; |
| |
| return mux_vc->enqueue_control_message(INKMUX_MSG_SHUTDOWN_WRITE, id, 0); |
| } |
| |
| |
| static int next_muxvc_id = 0; |
| |
| MuxVC::MuxVC(): |
| magic(MUX_VC_MAGIC_ALIVE), |
| id(0), |
| reentrancy_count(0), |
| terminate_vc(false), |
| on_mux_list(false), |
| clients_notified_of_error(false), |
| process_event(NULL), |
| net_vc(NULL), |
| read_vio(NULL), |
| write_vio(NULL), |
| write_bytes_added(0), |
| writes_blocked(false), |
| net_connect_action(NULL), |
| return_connect_action(), |
| connect_state(MUX_NOT_CONNECTED), |
| retry_event(NULL), |
| read_buffer(NULL), |
| write_buffer(NULL), |
| read_buffer_reader(NULL), |
| read_msg_state(MUX_READ_MSG_HEADER), |
| read_msg_size(0), |
| read_msg_ndone(0), |
| current_msg_hdr(), discard_read_data(false), return_accept_action(), next_client_id(1), num_clients(0), active_clients() |
| { |
| |
| memset(&local_addr, 0, sizeof(sockaddr_in)); |
| memset(&remote_addr, 0, sizeof(sockaddr_in)); |
| } |
| |
| void |
| MuxVC::init() |
| { |
| mutex = new_ProxyMutex(); |
| id = ink_atomic_increment(&next_muxvc_id, 1); |
| |
| Debug("mux_alloc", "[%d] Created new MuxVC", id); |
| } |
| |
| void |
| MuxVC::init_from_accept(NetVConnection * nvc, Continuation * acceptc) |
| { |
| mutex = new_ProxyMutex(); |
| net_vc = nvc; |
| connect_state = MUX_CONNECTED_ACTIVE; |
| set_mux_accept(acceptc); |
| |
| init_buffers(); |
| |
| id = ink_atomic_increment(&next_muxvc_id, 1); |
| Debug("mux_alloc", "[%d] Created new MuxVC from accept", id); |
| |
| MUTEX_TAKE_LOCK(mutex, this_ethread()); |
| init_io(); |
| Mutex_unlock(mutex, mutex->thread_holding); |
| |
| } |
| |
| void |
| MuxVC::init_buffers() |
| { |
| |
| if (!read_buffer) { |
| read_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| ink_debug_assert(read_buffer_reader == NULL); |
| read_buffer_reader = read_buffer->alloc_reader(); |
| } |
| |
| if (!write_buffer) { |
| write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_4K); |
| } |
| } |
| |
| void |
| MuxVC::init_io() |
| { |
| SET_HANDLER(MuxVC::state_handle_mux); |
| read_vio = net_vc->do_io_read(this, INT64_MAX, read_buffer); |
| write_vio = net_vc->do_io_write(this, INT64_MAX, write_buffer->alloc_reader()); |
| } |
| |
| |
| MuxVC::~MuxVC() |
| { |
| magic = MUX_VC_MAGIC_DEAD; |
| } |
| |
| bool |
| MuxVC::on_list(MuxClientVC * c) |
| { |
| MuxClientVC *tmp = active_clients.head; |
| |
| while (tmp) { |
| if (c == tmp) { |
| return true; |
| } |
| tmp = tmp->link.next; |
| } |
| return false; |
| } |
| |
| Action * |
| MuxVC::do_connect(Continuation * c, unsigned int ip, int port) |
| { |
| |
| ink_debug_assert(magic == MUX_VC_MAGIC_ALIVE); |
| ink_debug_assert(return_connect_action.continuation == NULL); |
| ink_debug_assert(connect_state == MUX_NOT_CONNECTED); |
| |
| reentrancy_count++; |
| connect_state = MUX_NET_CONNECT_ISSUED; |
| |
| return_connect_action = c; |
| SET_HANDLER(MuxVC::state_handle_connect); |
| |
| Debug("muxvc", "MuxVC::do_connect issued to %u.%u.%u.%u port %d", |
| ((unsigned char *) &ip)[0], |
| ((unsigned char *) &ip)[1], ((unsigned char *) &ip)[2], ((unsigned char *) &ip)[3], port); |
| |
| // Keep our own mutex ref as we can get deallocted on the |
| // on the callback |
| ProxyMutexPtr my_mutex_ref = mutex; |
| |
| // Fix Me: need to respect interface binding |
| MUTEX_TAKE_LOCK(my_mutex_ref, this_ethread()); |
| Action *tmp = netProcessor.connect_re(this, ip, port); |
| Mutex_unlock(my_mutex_ref, my_mutex_ref->thread_holding); |
| |
| if (tmp != ACTION_RESULT_DONE) { |
| net_connect_action = tmp; |
| } |
| |
| Debug("mux_open", "do_connect state is %d", connect_state); |
| reentrancy_count--; |
| |
| switch (connect_state) { |
| case MUX_NET_CONNECT_ISSUED: |
| return &return_connect_action; |
| case MUX_WAIT_FOR_READY: |
| return &return_connect_action; |
| case MUX_CONNECT_FAILED: |
| kill(); |
| return ACTION_RESULT_DONE; |
| default: |
| ink_release_assert(0); |
| break; |
| } |
| |
| return NULL; |
| } |
| |
| int |
| MuxVC::state_handle_connect(int event, void *data) |
| { |
| |
| ink_release_assert(magic == MUX_VC_MAGIC_ALIVE); |
| ink_debug_assert(net_vc == NULL); |
| |
| Debug("muxvc", "MuxVC::connect_handler event %d", event); |
| Debug("mux_open", "MuxVC::connect_handler event %d", event); |
| net_connect_action = NULL; |
| |
| switch (event) { |
| case NET_EVENT_OPEN: |
| // FIX - Unix & NT connect behavior differ |
| connect_state = MUX_WAIT_FOR_READY; |
| net_vc = (NetVConnection *) data; |
| setup_connect_check(); |
| break; |
| case NET_EVENT_OPEN_FAILED: |
| connect_state = MUX_CONNECT_FAILED; |
| state_send_init_response(EVENT_NONE, NULL); |
| break; |
| default: |
| ink_release_assert(0); |
| break; |
| } |
| |
| return 0; |
| } |
| |
| // int MuxVC::state_wait_for_ready(int event, void* data) |
| // |
| // Checks to see if a socket goes ready or timesout |
| // after a connect |
| // |
| int |
| MuxVC::state_wait_for_ready(int event, void *data) |
| { |
| |
| ink_release_assert(magic == MUX_VC_MAGIC_ALIVE); |
| ink_debug_assert(connect_state == MUX_WAIT_FOR_READY); |
| |
| Debug("muxvc", "MuxVC::state_wait_for_ready event %d", event); |
| Debug("mux_open", "MuxVC::state_wait_for_ready event %d", event); |
| |
| SET_HANDLER(MuxVC::state_send_init_response); |
| |
| switch (event) { |
| case VC_EVENT_WRITE_READY: |
| ink_debug_assert(data == write_vio); |
| connect_state = MUX_CONNECTED_ACTIVE; |
| net_vc->cancel_inactivity_timeout(); |
| net_vc->do_io_write(this, 0, NULL); |
| local_addr = net_vc->get_local_addr(); |
| remote_addr = net_vc->get_remote_addr(); |
| write_vio = NULL; |
| state_send_init_response(EVENT_NONE, NULL); |
| break; |
| case VC_EVENT_INACTIVITY_TIMEOUT: |
| case VC_EVENT_ERROR: |
| connect_state = MUX_CONNECT_FAILED; |
| net_vc->do_io_close(0); |
| net_vc = NULL; |
| state_send_init_response(EVENT_NONE, NULL); |
| break; |
| default: |
| ink_release_assert(0); |
| break; |
| } |
| |
| return 0; |
| } |
| |
| // int MuxVC::state_send_init_response(int event, void* data) |
| // |
| // sends an event in response to the do_connect() call |
| // |
| int |
| MuxVC::state_send_init_response(int event, void *data) |
| { |
| |
| ink_debug_assert(event == EVENT_NONE || (event == EVENT_INTERVAL && data == retry_event)); |
| |
| if (event == EVENT_INTERVAL) { |
| retry_event = NULL; |
| } |
| |
| MUTEX_TRY_LOCK(lock, return_connect_action.mutex, this_ethread()); |
| |
| if (!lock) { |
| Debug("mux_open", "[MuxVC::state_send_init_response] lock missed, retrying"); |
| retry_event = eventProcessor.schedule_in(this, MUX_LOCK_RETRY); |
| return 0; |
| } |
| |
| if (!return_connect_action.cancelled) { |
| Continuation *callback_c = return_connect_action.continuation; |
| return_connect_action = NULL; |
| |
| switch (connect_state) { |
| case MUX_CONNECTED_ACTIVE: |
| Debug("mux_open", "[MuxVC::state_send_init_response] sending MUX_EVENT_OPEN"); |
| callback_c->handleEvent(MUX_EVENT_OPEN); |
| init_buffers(); |
| init_io(); |
| return_connect_action = NULL; |
| break; |
| case MUX_CONNECT_FAILED: |
| Debug("mux_open", "[MuxVC::state_send_init_response] sending MUX_EVENT_FAILED"); |
| callback_c->handleEvent(MUX_EVENT_OPEN_FAILED); |
| |
| // We doing lazy reentrancy counting. Only doing |
| // where we know there are issues. So if the count |
| // is zero no one is blocking us from deallocating |
| // ourselves |
| if (reentrancy_count == 0) { |
| kill(); |
| } |
| break; |
| default: |
| ink_release_assert(0); |
| break; |
| |
| } |
| |
| } else { |
| return_connect_action = NULL; |
| kill(); |
| } |
| |
| return 0; |
| } |
| |
| // void MuxVC:::setup_connect_check() |
| // |
| // On Unix platforms, connect is non-blocking and doesn't acutally tell |
| // you if the connect succeeded. We need to set up a write and wait |
| // for write ready to see if the connect actually worked |
| // |
| void |
| MuxVC::setup_connect_check() |
| { |
| |
| write_buffer = new_MIOBuffer(BUFFER_SIZE_INDEX_1K); |
| IOBufferReader *r = write_buffer->alloc_reader(); |
| |
| SET_HANDLER(MuxVC::state_wait_for_ready); |
| |
| // FIX - ready timeout should be tunable |
| net_vc->set_inactivity_timeout(HRTIME_SECONDS(30)); |
| |
| ink_debug_assert(write_vio == NULL); |
| write_vio = net_vc->do_io_write(this, INT64_MAX, r); |
| } |
| |
| |
| Action * |
| MuxVC::set_mux_accept(Continuation * c) |
| { |
| return_accept_action = c; |
| return &return_accept_action; |
| } |
| |
| void |
| MuxVC::kill() |
| { |
| |
| ink_debug_assert(mutex->thread_holding == this_ethread()); |
| ink_debug_assert(reentrancy_count == 0); |
| ink_release_assert(num_clients == 0); |
| |
| Debug("mux_alloc", "[%d] Cleaning up MuxVC", id); |
| |
| magic = MUX_VC_MAGIC_DEAD; |
| |
| if (net_vc) { |
| net_vc->do_io_close(0); |
| net_vc = NULL; |
| } |
| |
| if (net_connect_action) { |
| net_connect_action->cancel(); |
| net_connect_action = NULL; |
| } |
| |
| return_connect_action = NULL; |
| |
| if (read_buffer) { |
| free_MIOBuffer(read_buffer); |
| read_buffer = NULL; |
| } |
| |
| if (write_buffer) { |
| free_MIOBuffer(write_buffer); |
| write_buffer = NULL; |
| } |
| |
| if (process_event != NULL) { |
| process_event->cancel(); |
| process_event = NULL; |
| } |
| // If we are on the mux processor list, we must remove ourself |
| // before we can dealloc ourself |
| if (on_mux_list) { |
| if (try_processor_list_remove() == 0) { |
| SET_HANDLER(MuxVC::state_remove_from_list); |
| setup_process_event(10); |
| return; |
| } |
| } |
| |
| Debug("mux_alloc", "[%d] Killing MuxVC", id); |
| ink_debug_assert(on_mux_list == false); |
| mutex = NULL; |
| delete this; |
| } |
| |
| int |
| MuxVC::state_remove_from_list(int event, void *data) |
| { |
| |
| ink_debug_assert(data == process_event); |
| ink_debug_assert(event == EVENT_INTERVAL); |
| ink_debug_assert(on_mux_list); |
| |
| process_event = NULL; |
| if (try_processor_list_remove() == 0) { |
| setup_process_event(10); |
| } else { |
| kill(); |
| } |
| |
| return EVENT_DONE; |
| } |
| |
| int |
| MuxVC::try_processor_list_remove() |
| { |
| |
| MUTEX_TRY_LOCK(list_lock, muxProcessor.list_mutex, this_ethread()); |
| if (list_lock) { |
| muxProcessor.mux_list.remove(this); |
| on_mux_list = false; |
| return 1; |
| } |
| |
| return 0; |
| } |
| |
| // MuxClientVC* MuxVC::new_client(int32_t id_arg) |
| // |
| // Caller MUST be holding MuxVC::mutex |
| // |
| MuxClientVC * |
| MuxVC::new_client(int32_t id_arg) |
| { |
| |
| ink_debug_assert(magic == MUX_VC_MAGIC_ALIVE); |
| ink_release_assert(mutex->thread_holding == this_ethread()); |
| |
| if (connect_state == MUX_CONNECTED_IDLE) { |
| ink_debug_assert(process_event != NULL); |
| process_event->cancel(); |
| process_event = NULL; |
| connect_state = MUX_CONNECTED_ACTIVE; |
| SET_HANDLER(MuxVC::state_handle_mux); |
| } |
| ink_debug_assert(connect_state == MUX_CONNECTED_ACTIVE); |
| |
| MuxClientVC *new_client = NEW(new MuxClientVC); |
| |
| if (id_arg == 0) { |
| id_arg = next_client_id++; |
| enqueue_control_message(INKMUX_MSG_OPEN_CHANNEL, id_arg, 0); |
| } |
| |
| Debug("muxvc", "creating new client with id %d", id_arg); |
| Debug("mux_alloc", "[%d,%d] Creating new mux client id", this->id, id_arg); |
| |
| new_client->init(this, id_arg); |
| |
| num_clients++; |
| active_clients.push(new_client); |
| |
| ink_debug_assert(on_list(new_client) == true); |
| |
| return new_client; |
| } |
| |
| // void MuxVC::remove_client(MuxClientVC* client) |
| // |
| // Callee must be holding this->mutex |
| // |
| void |
| MuxVC::remove_client(MuxClientVC * client) |
| { |
| |
| ink_debug_assert(mutex->thread_holding == this_ethread()); |
| |
| num_clients--; |
| active_clients.remove(client); |
| |
| if (!client->other_side_closed & MUX_OCLOSE_CHANNEL_EVENT) { |
| enqueue_control_message(INKMUX_MSG_CLOSE_CHANNEL, client->id, 0); |
| } |
| |
| Debug("mux_alloc", "[%d,%d] Remvoing mux client id", this->id, client->id); |
| client->kill(); |
| |
| // If we're out of clients, we either need to go into an idle state |
| // or kill ourselves |
| if (num_clients == 0) { |
| switch (connect_state) { |
| case MUX_CONNECTED_ACTIVE: |
| Debug("muxvc", "[%d] Setting muxVC to idle state", id); |
| connect_state = MUX_CONNECTED_IDLE; |
| SET_HANDLER(MuxVC::state_idle); |
| if (process_event) { |
| process_event->cancel(); |
| process_event = NULL; |
| } |
| setup_process_event(60000); // Fix me - make configurable |
| break; |
| case MUX_CONNECTION_DROPPED: |
| if (reentrancy_count == 0) { |
| kill(); |
| } else { |
| terminate_vc = true; |
| } |
| break; |
| default: |
| ink_release_assert(0); |
| break; |
| } |
| } |
| } |
| |
| // void MuxVC::enqueue_control_message(int msg_id, int32_t cid, int data_size) |
| // |
| // Builds a control message and inserts it on the write buffer |
| // |
| int |
| MuxVC::enqueue_control_message(int msg_id, int32_t cid, int data_size) |
| { |
| |
| ink_debug_assert(data_size + sizeof(MuxMessage) <= USHRT_MAX); |
| |
| MuxMessage mm; |
| Debug("mux_cntl", "enqueue_control_message: %s for %d", control_msg_id_to_string(msg_id), cid); |
| |
| mm.version = (uint8_t) INKMUX_PROTO_VERSION_0_1; |
| mm.msg_type = (uint8_t) msg_id; |
| mm.msg_len = (uint16_t) (sizeof(MuxMessage) + data_size); |
| mm.client_id = cid; |
| |
| write_buffer->write((char *) &mm, sizeof(MuxMessage)); |
| |
| if (write_vio && connect_state != MUX_CONNECTION_DROPPED) { |
| write_vio->reenable(); |
| } |
| |
| write_bytes_added += mm.msg_len; |
| |
| return (int) mm.msg_len; |
| }; |
| |
| |
| void |
| MuxVC::process_clients() |
| { |
| |
| ink_debug_assert(magic == MUX_VC_MAGIC_ALIVE); |
| |
| EThread *my_ethread = this_ethread(); |
| MuxClientVC *current = active_clients.head; |
| MuxClientVC *next = NULL; |
| |
| int locks_missed = 0; |
| int bytes_written = 0; |
| int count = 0; |
| |
| for (; current != NULL; current = next) { |
| |
| count++; |
| next = current->link.next; |
| |
| if (current->closed) { |
| this->remove_client(current); |
| continue; |
| } |
| |
| if (current->write_state.enabled) { |
| MUTEX_TRY_LOCK(wlock, current->write_state.vio.mutex, my_ethread); |
| |
| if (wlock) { |
| if (current->write_state.enabled) { |
| bytes_written += current->process_write(); |
| |
| if (current->closed) { |
| this->remove_client(current); |
| continue; |
| } |
| } |
| } else { |
| locks_missed++; |
| } |
| } |
| |
| if (current->read_state.enabled) { |
| MUTEX_TRY_LOCK(rlock, current->read_state.vio.mutex, my_ethread); |
| |
| if (rlock) { |
| if (current->read_state.enabled) { |
| current->process_read_state(); |
| |
| if (current->closed) { |
| this->remove_client(current); |
| continue; |
| } |
| } |
| } else { |
| locks_missed++; |
| } |
| } |
| } |
| |
| if (bytes_written > 0) { |
| Debug("muxvc", "MuxVC::process_clients - reenabling write, %d bytes added", bytes_written); |
| write_vio->reenable(); |
| } |
| |
| if (locks_missed > 0) { |
| setup_process_event(10); |
| } |
| } |
| |
| // MuxClientVC* MuxVC::find_client(int32_t client_id) |
| // |
| // Search the client list of a MuxClientVC |
| // matching clinet_id |
| // |
| MuxClientVC * |
| MuxVC::find_client(int32_t client_id) |
| { |
| |
| MuxClientVC *current = active_clients.head; |
| |
| while (current) { |
| if (current->id == client_id) { |
| return current; |
| } |
| current = current->link.next; |
| } |
| |
| return NULL; |
| } |
| |
| // void MuxVC::process_read_msg_body() |
| // |
| // Process the body of a data message to put the data on |
| // the client vc |
| // |
| void |
| MuxVC::process_read_msg_body() |
| { |
| |
| bool need_byte_bank = false; |
| bool need_flow_control = false; |
| MuxClientVC *client = NULL; |
| |
| ink_debug_assert(read_msg_state == MUX_READ_MSG_BODY); |
| |
| int avail = read_buffer_reader->read_avail(); |
| if (avail > 0) { |
| |
| if (!discard_read_data) { |
| client = find_client(current_msg_hdr.client_id); |
| |
| if (!client) { |
| // No client - send a reset message to remote side |
| discard_read_data = true; |
| enqueue_control_message(INKMUX_MSG_CHANNEL_RESET, current_msg_hdr.client_id); |
| } else { |
| |
| if (client->read_state.vio.op != VIO::READ || !client->read_state.vio.mutex) { |
| // No active read |
| need_byte_bank = need_flow_control = true; |
| goto ADD_TO_BYTE_BANK; |
| } |
| |
| |
| MUTEX_TRY_LOCK(lock, client->read_state.vio.mutex, this_ethread()); |
| if (lock) { |
| |
| if (client->closed) { |
| discard_read_data = true; |
| enqueue_control_message(INKMUX_MSG_CHANNEL_RESET, current_msg_hdr.client_id); |
| } else if (client->read_state.shutdown) { |
| discard_read_data = true; |
| } else { |
| |
| // Process the outstanding byte bank |
| if (client->read_byte_bank) { |
| int res = client->process_byte_bank(); |
| if (client->closed) { |
| this->remove_client(client); |
| return; |
| } else if (res > 0) { |
| // If there is still data on the byte bank, |
| // all new data must go to the byte bank |
| // as well |
| need_byte_bank = true; |
| goto ADD_TO_BYTE_BANK; |
| } |
| } |
| // We've got the lock and the clinet is not closed |
| // or shutdown. See how much data we can shove on the |
| // client's buffer |
| int left_in_msg = current_msg_hdr.msg_len - read_msg_ndone; |
| int act_on = MIN(avail, left_in_msg); |
| int64_t vio_todo = client->read_state.vio.ntodo(); |
| |
| // If available bytes execeeds amount the I/O requested |
| // on the local side, tell the other side to stop sending |
| if (act_on > vio_todo) { |
| need_byte_bank = true; |
| act_on = vio_todo; |
| need_flow_control = true; |
| } |
| // User has set nbytes to ndone so we have |
| // don't have any data to more |
| if (vio_todo == 0) { |
| client->read_state.enabled = 0; |
| goto ADD_TO_BYTE_BANK; |
| } |
| |
| Debug("muxvc", "reading %d bytes of %d for %d", |
| act_on, (int) current_msg_hdr.msg_len, current_msg_hdr.client_id); |
| |
| mux_move_data(client->read_state.vio.buffer.writer(), read_buffer_reader, act_on); |
| |
| client->read_state.vio.ndone += act_on; |
| read_msg_ndone += act_on; |
| |
| int event; |
| if (client->read_state.vio.ntodo() == 0) { |
| event = VC_EVENT_READ_COMPLETE; |
| } else { |
| event = VC_EVENT_READ_READY; |
| |
| MIOBuffer *client_buf = client->read_state.vio.buffer.writer(); |
| if (client_buf->high_water() && client_buf->max_read_avail() >= client_buf->block_size()) { |
| need_flow_control = 1; |
| } |
| } |
| Debug("muxvc", "[MuxVC::process_read_msg_body] callback for [%d,%d]" |
| " ndone %d, nbytes %d", id, current_msg_hdr.client_id, |
| client->read_state.vio.ndone, client->read_state.vio.nbytes); |
| |
| client->update_inactive_timeout(); |
| client->reentrancy_count++; |
| client->read_state.vio._cont->handleEvent(event, &client->read_state.vio); |
| client->reentrancy_count--; |
| |
| if (client->closed) { |
| this->remove_client(client); |
| return; |
| } |
| } |
| } else { |
| need_byte_bank = true; |
| } |
| } |
| } |
| // If the client isn't available or has closed or shutdown reading, |
| // discard the input data |
| if (discard_read_data) { |
| ink_debug_assert(need_byte_bank == false); |
| int left_in_msg = current_msg_hdr.msg_len - read_msg_ndone; |
| int act_on = MIN(avail, left_in_msg); |
| read_buffer_reader->consume(act_on); |
| read_msg_ndone += act_on; |
| } |
| |
| ADD_TO_BYTE_BANK: |
| if (need_byte_bank) { |
| ink_debug_assert(discard_read_data == false); |
| |
| // Either missed the lock or bytes sent exceeds the amount the client asked |
| // for. Need to store in byte until client is ready for these bytes |
| if (client->read_byte_bank == NULL) { |
| client->read_byte_bank = new_MIOBuffer(BUFFER_SIZE_INDEX_1K); |
| client->byte_bank_reader = client->read_byte_bank->alloc_reader(); |
| |
| } |
| // Above operations could have changed avail so get |
| // fresh info |
| avail = read_buffer_reader->read_avail(); |
| int left_in_msg = current_msg_hdr.msg_len - read_msg_ndone; |
| int act_on = MIN(avail, left_in_msg); |
| |
| Debug("muxvc", "adding %d bytes to byte bank for [%d,%d]", act_on, id, current_msg_hdr.client_id); |
| Debug("mux_bank", "adding %d bytes to byte bank for [%d,%d]", act_on, id, current_msg_hdr.client_id); |
| |
| mux_move_data(client->read_byte_bank, read_buffer_reader, act_on); |
| read_msg_ndone += act_on; |
| |
| if (client->byte_bank_reader->read_avail() > MUX_MAX_BYTES_BANK) { |
| need_flow_control = true; |
| } |
| setup_process_event(10); |
| } |
| |
| /*if (need_flow_control) { |
| enqueue_control_message(INKMUX_MSG_FLOW_CONTROL_START, |
| current_msg_hdr.client_id); |
| client->read_state.flow_stopped = 1; |
| client->read_state.enabled = 0; |
| } |
| */ |
| |
| if (read_msg_ndone == current_msg_hdr.msg_len) { |
| Debug("muxvc", "completed read of normal data for id %d len %d", |
| current_msg_hdr.client_id, (int) current_msg_hdr.msg_len); |
| reset_read_msg_state(); |
| } |
| } |
| } |
| |
| // void MuxVC::process_read_data() |
| // |
| // Loops over input stream and process messages |
| // |
| void |
| MuxVC::process_read_data() |
| { |
| |
| while (read_buffer_reader->read_avail() > 0) { |
| |
| if (read_msg_state == MUX_READ_MSG_HEADER) { |
| char *copy_to = ((char *) (¤t_msg_hdr)) + read_msg_ndone; |
| int act_on = sizeof(MuxMessage) - read_msg_ndone; |
| ink_debug_assert(act_on > 0); |
| |
| int res = read_buffer_reader->read(copy_to, act_on); |
| read_msg_ndone += res; |
| |
| if (read_msg_ndone == sizeof(MuxMessage)) { |
| |
| if (current_msg_hdr.msg_type != INKMUX_MSG_NORMAL_DATA) { |
| |
| process_control_message(); |
| reset_read_msg_state(); |
| } else { |
| |
| // Check for bogus zero body length |
| if (current_msg_hdr.msg_len == read_msg_ndone) { |
| reset_read_msg_state(); |
| continue; |
| } |
| |
| read_msg_state = MUX_READ_MSG_BODY; |
| } |
| } |
| } |
| |
| if (read_msg_state == MUX_READ_MSG_BODY) { |
| Debug("muxvc", "control msg - normal data for %d len %d", |
| current_msg_hdr.client_id, (int) current_msg_hdr.msg_len); |
| process_read_msg_body(); |
| } |
| } |
| } |
| |
| void |
| MuxVC::process_control_message() |
| { |
| |
| MuxClientVC *client = find_client(current_msg_hdr.client_id); |
| |
| int msg_type = (int) current_msg_hdr.msg_type; |
| |
| Debug("mux_cntl", "control msg %s for %d", control_msg_id_to_string(msg_type), current_msg_hdr.client_id); |
| |
| switch (msg_type) { |
| case INKMUX_MSG_OPEN_CHANNEL: |
| process_channel_open(); |
| break; |
| case INKMUX_MSG_CLOSE_CHANNEL: |
| { |
| if (client) { |
| client->other_side_closed |= |
| (MUX_OCLOSE_CHANNEL_EVENT | MUX_OCLOSE_NEED_READ_NOTIFY | MUX_OCLOSE_NEED_WRITE_NOTIFY); |
| process_channel_close(client); |
| |
| if (client->closed) { |
| remove_client(client); |
| } |
| } |
| break; |
| } |
| case INKMUX_MSG_CHANNEL_RESET: |
| break; |
| case INKMUX_MSG_FLOW_CONTROL_START: |
| client->write_state.flow_stopped = 1; |
| break; |
| case INKMUX_MSG_FLOW_CONTROL_STOP: |
| client->write_state.flow_stopped = 0; |
| process_clients(); |
| break; |
| case INKMUX_MSG_SHUTDOWN_WRITE: |
| if (client) { |
| client->other_side_closed |= (MUX_OCLOSE_WRITE_EVENT | MUX_OCLOSE_NEED_READ_NOTIFY); |
| process_channel_inbound_shutdown(client); |
| |
| if (client->closed) { |
| remove_client(client); |
| } |
| } |
| break; |
| |
| default: |
| ink_release_assert(0); |
| } |
| |
| } |
| |
| void |
| MuxVC::process_channel_open() |
| { |
| |
| if (!return_accept_action.continuation) { |
| enqueue_control_message(INKMUX_MSG_CLOSE_CHANNEL, current_msg_hdr.client_id, 0); |
| return; |
| } |
| // Right now, only the initiating side can create sessions due to |
| // how the id's are managed. If we're receiving a session, we |
| // could not have ever created one |
| ink_release_assert(next_client_id == 1); |
| |
| MuxClientVC *new_vc = new_client(current_msg_hdr.client_id); |
| |
| |
| EThread *my_ethread = this_ethread(); |
| |
| // FIX - nasty hack - need to understand why we need netvc->thread ptr |
| new_vc->thread = my_ethread; |
| |
| // FIX - should try locks and retries here |
| // Fix - need check if we have a mutex before taking it |
| // MUTEX_TAKE_LOCK(return_accept_action.mutex, my_ethread); |
| |
| if (!return_accept_action.cancelled) { |
| |
| return_accept_action.continuation->handleEvent(NET_EVENT_ACCEPT, new_vc); |
| } |
| // Mutex_unlock(return_accept_action.mutex, my_ethread); |
| } |
| |
| // void MuxVC::process_channel_close(MuxClientVC* client) |
| // |
| // Handles sending EOS & ERROR events to the client when the |
| // the other side closed the channel |
| // |
| // CALLER is responsible for handling reentrant closes |
| // |
| void |
| MuxVC::process_channel_close(MuxClientVC * client) |
| { |
| |
| EThread *my_ethread = this_ethread(); |
| |
| if (client->other_side_closed & MUX_OCLOSE_NEED_READ_NOTIFY) { |
| |
| if (client->read_state.vio.mutex) { |
| MUTEX_TRY_LOCK(rlock, client->read_state.vio.mutex, my_ethread); |
| |
| if (rlock) { |
| if ( //client->read_state.enabled && |
| !client->closed && !client->read_byte_bank) { |
| // If the read is active and there's no byte bank, |
| // notify of the close. We need to defer on a byte |
| // bank as that data needs to be processed before shutting |
| // down the channel |
| client->process_channel_close_for_read(); |
| |
| // If the client closed on the callback, bail |
| if (client->closed) { |
| return; |
| } |
| } |
| } else { |
| setup_process_event(10); |
| } |
| } else { |
| client->other_side_closed &= ~MUX_OCLOSE_NEED_READ_NOTIFY; |
| } |
| } |
| |
| if (client->other_side_closed & MUX_OCLOSE_NEED_WRITE_NOTIFY) { |
| |
| if (client->write_state.vio.mutex) { |
| MUTEX_TRY_LOCK(wlock, client->write_state.vio.mutex, my_ethread); |
| |
| if (wlock) { |
| if ( //client->write_state.enabled && |
| !client->closed) { |
| client->process_channel_close_for_write(); |
| |
| // If the client closed on the callback, bail |
| if (client->closed) { |
| return; |
| } |
| } |
| |
| } else { |
| setup_process_event(10); |
| } |
| } else { |
| client->other_side_closed &= ~MUX_OCLOSE_NEED_WRITE_NOTIFY; |
| } |
| } |
| } |
| |
| |
| void |
| MuxVC::process_channel_inbound_shutdown(MuxClientVC * client) |
| { |
| |
| EThread *my_ethread = this_ethread(); |
| |
| ink_debug_assert(client->other_side_closed & MUX_OCLOSE_NEED_READ_NOTIFY); |
| |
| if (client->read_state.vio.mutex) { |
| MUTEX_TRY_LOCK(rlock, client->read_state.vio.mutex, my_ethread); |
| |
| if (rlock) { |
| if (client->read_state.enabled && !client->closed && !client->read_byte_bank) { |
| // If the read is active and there's no byte bank, |
| // notify of the close. We need to defer on a byte |
| // bank as that data needs to be processed before shutting |
| // down the channel |
| client->process_channel_close_for_read(); |
| |
| // If the client closed on the callback, bail |
| if (client->closed) { |
| return; |
| } |
| } |
| } else { |
| setup_process_event(10); |
| } |
| } else { |
| client->other_side_closed &= ~MUX_OCLOSE_NEED_READ_NOTIFY; |
| } |
| } |
| |
| void |
| MuxVC::reset_read_msg_state() |
| { |
| read_msg_state = MUX_READ_MSG_HEADER; |
| read_msg_size = 0; |
| read_msg_ndone = 0; |
| discard_read_data = false; |
| |
| memset(¤t_msg_hdr, 0, sizeof(MuxMessage)); |
| } |
| |
| void |
| MuxVC::setup_process_event(int ms) |
| { |
| if (!process_event) { |
| if (ms > 0) { |
| process_event = eventProcessor.schedule_in(this, HRTIME_MSECONDS(ms)); |
| } else { |
| process_event = eventProcessor.schedule_imm(this); |
| } |
| } |
| } |
| |
| bool |
| MuxVC::write_high_water() |
| { |
| if (write_bytes_added - write_vio->ndone > MUX_WRITE_HIGH_WATER && 0) { |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| void |
| MuxVC::cleanup_on_error() |
| { |
| |
| ink_debug_assert(connect_state == MUX_CONNECTION_DROPPED); |
| |
| MuxClientVC *current = active_clients.head; |
| MuxClientVC *next = NULL; |
| |
| reentrancy_count++; |
| |
| Debug("muxvc", "[MuxVC::cleanup_on_error] for %d", id); |
| |
| if (num_clients == 0) { |
| terminate_vc = true; |
| } else { |
| if (clients_notified_of_error == false) { |
| // Loop over the clients trying to kill them off |
| for (; current != NULL; current = next) { |
| |
| next = current->link.next; |
| |
| if (!current->closed) { |
| |
| // Take care not to renotify if notification has already |
| // received |
| if (!(current->other_side_closed & MUX_OCLOSE_INBOUND_MASK)) { |
| current->other_side_closed |= MUX_OCLOSE_NEED_READ_NOTIFY; |
| } |
| |
| if (!(current->other_side_closed & MUX_OCLOSE_OUTBOUND_MASK)) { |
| current->other_side_closed |= MUX_OCLOSE_NEED_WRITE_NOTIFY; |
| } |
| |
| current->other_side_closed |= MUX_OCLOSE_CHANNEL_EVENT; |
| process_channel_close(current); |
| } |
| |
| if (current->closed) { |
| remove_client(current); |
| } |
| } |
| clients_notified_of_error = true; |
| } |
| } |
| |
| reentrancy_count--; |
| if (terminate_vc && reentrancy_count == 0) { |
| kill(); |
| } |
| } |
| |
| // int MuxVC::state_teardown(int event, void* data) |
| // |
| // We're waiting for everything in the write |
| // buffer to be sent |
| // |
| int |
| MuxVC::state_teardown(int event, void *data) |
| { |
| |
| Debug("muxvc", "state_teardown: event %d", event); |
| reentrancy_count++; |
| |
| switch (event) { |
| case VC_EVENT_WRITE_COMPLETE: |
| ink_debug_assert(data == write_vio); |
| terminate_vc = true; |
| break; |
| case VC_EVENT_WRITE_READY: |
| // Ignore |
| break; |
| default: |
| ink_release_assert(0); |
| } |
| |
| reentrancy_count--; |
| if (terminate_vc && reentrancy_count == 0) { |
| kill(); |
| } |
| |
| return EVENT_DONE; |
| } |
| |
| int |
| MuxVC::state_idle(int event, void *data) |
| { |
| |
| int r = EVENT_DONE; |
| |
| ink_release_assert(magic == MUX_VC_MAGIC_ALIVE); |
| ink_debug_assert(connect_state == MUX_CONNECTED_IDLE); |
| ink_debug_assert(num_clients == 0); |
| |
| Debug("muxvc", "state_idle: event %d", event); |
| reentrancy_count++; |
| |
| switch (event) { |
| case EVENT_INTERVAL: |
| case EVENT_IMMEDIATE: |
| { |
| ink_debug_assert(process_event == data); |
| process_event = NULL; |
| connect_state = MUX_CONNECTED_TEARDOWN; |
| |
| int avail = write_vio->get_reader()->read_avail(); |
| if (avail == 0) { |
| // All data sent. We're done. |
| terminate_vc = true; |
| } else { |
| // We need to flush data |
| SET_HANDLER(state_teardown); |
| write_vio->nbytes = write_vio->ndone + avail; |
| |
| // check for rollover in bytes |
| if (write_vio->nbytes < 0 || write_vio->nbytes == INT64_MAX) { |
| write_vio = net_vc->do_io_write(this, avail, write_vio->get_reader()); |
| } else { |
| write_vio->reenable(); |
| } |
| |
| // We don't want to hear from the read side anymore |
| // |
| // FIX ME - Is there a race between new sessions |
| // being opened from the remote and the |
| // shutdown being issues? |
| net_vc->do_io_shutdown(IO_SHUTDOWN_READ); |
| read_vio = NULL; |
| |
| // FIX ME - should set inactivity timeout here to |
| // prevnt stuff from hanging around forever |
| } |
| break; |
| } |
| default: |
| // Forward to the standard mux handle since |
| // this event should be coming from |
| // the underlying netvc |
| r = state_handle_mux(event, data); |
| break; |
| } |
| reentrancy_count--; |
| if (terminate_vc && reentrancy_count == 0) { |
| kill(); |
| } |
| |
| return r; |
| } |
| |
| |
| |
| int |
| MuxVC::state_handle_mux_down(int event, void *data) |
| { |
| |
| ink_release_assert(magic == MUX_VC_MAGIC_ALIVE); |
| |
| Debug("muxvc", "state_handle_mux_down: event %d", event); |
| reentrancy_count++; |
| |
| switch (event) { |
| case EVENT_INTERVAL: |
| case EVENT_IMMEDIATE: |
| ink_debug_assert(process_event == data); |
| process_event = NULL; |
| cleanup_on_error(); |
| break; |
| default: |
| ink_release_assert(0); |
| break; |
| } |
| reentrancy_count--; |
| |
| if (terminate_vc && reentrancy_count == 0) { |
| kill(); |
| } |
| |
| return EVENT_CONT; |
| } |
| |
| int |
| MuxVC::state_handle_mux(int event, void *data) |
| { |
| |
| ink_release_assert(magic == MUX_VC_MAGIC_ALIVE); |
| |
| Debug("muxvc", "state_handle_mux: event %d", event); |
| reentrancy_count++; |
| |
| switch (event) { |
| case VC_EVENT_WRITE_COMPLETE: |
| // We hit INT64_MAX bytes. Reset the I/O |
| ink_debug_assert(data == write_vio); |
| ink_debug_assert(write_vio->ndone == INT64_MAX); |
| write_bytes_added -= write_vio->ndone; |
| write_vio = net_vc->do_io_write(this, INT64_MAX, write_vio->buffer.reader()); |
| // FALL THROUGH |
| case VC_EVENT_WRITE_READY: |
| ink_debug_assert(data == write_vio); |
| Debug("muxvc", "state_handle_mux: WRITE_READY, ndone: %d", write_vio->ndone); |
| |
| if (writes_blocked) { |
| writes_blocked = false; |
| process_clients(); |
| } |
| break; |
| case VC_EVENT_READ_COMPLETE: |
| // We hit INT64_MAX bytes. Reset the I/O |
| ink_debug_assert(data == read_vio); |
| read_vio = net_vc->do_io_read(this, INT64_MAX, read_buffer); |
| // FALL THROUGH |
| case VC_EVENT_READ_READY: |
| ink_debug_assert(data == read_vio); |
| /*Debug("muxvc", "state_handle_mux: READ_READY, ndone: %d, " |
| "avail %d, high_water %d", |
| read_vio->ndone, |
| read_buffer_reader->read_avail(), |
| (int) read_vio->buffer.writer()->high_water()); */ |
| process_read_data(); |
| read_vio->reenable(); |
| break; |
| case EVENT_INTERVAL: |
| case EVENT_IMMEDIATE: |
| /*Debug("muxvc", "state_handle_mux: read_side: ndone: %d, " |
| "avail %d, high_water %d", |
| read_vio->ndone, |
| read_buffer_reader->read_avail(), |
| read_buffer->max_read_avail(), |
| (int) read_buffer->high_water()); */ |
| ink_debug_assert(process_event == data); |
| process_event = NULL; |
| process_clients(); |
| read_vio->reenable(); |
| break; |
| case VC_EVENT_ERROR: |
| case VC_EVENT_EOS: |
| case VC_EVENT_INACTIVITY_TIMEOUT: |
| case VC_EVENT_ACTIVE_TIMEOUT: |
| net_vc->do_io_close(0); |
| net_vc = NULL; |
| connect_state = MUX_CONNECTION_DROPPED; |
| SET_HANDLER(MuxVC::state_handle_mux_down); |
| cleanup_on_error(); |
| break; |
| default: |
| ink_release_assert(0); |
| } |
| |
| reentrancy_count--; |
| if (terminate_vc && reentrancy_count == 0) { |
| kill(); |
| } |
| |
| return EVENT_CONT; |
| } |
| |
| unsigned int |
| MuxVC::get_remote_ip() |
| { |
| return (unsigned int) remote_addr.sin_addr.s_addr; |
| } |
| |
| unsigned int |
| MuxVC::get_remote_port() |
| { |
| return (unsigned int) remote_addr.sin_port; |
| } |
| |
| MuxAcceptor::MuxAcceptor(): |
| accept_action(NULL), call_cont(NULL), Continuation(new_ProxyMutex()) |
| { |
| } |
| |
| MuxAcceptor::~MuxAcceptor() |
| { |
| if (accept_action) { |
| accept_action->cancel(); |
| accept_action = NULL; |
| } |
| } |
| |
| void |
| MuxAcceptor::init(int port, Continuation * c) |
| { |
| SET_HANDLER(MuxAcceptor::accept_handler); |
| accept_action = netProcessor.accept(this, port); |
| call_cont = c; |
| } |
| |
| int |
| MuxAcceptor::accept_handler(int event, void *data) |
| { |
| switch (event) { |
| case NET_EVENT_ACCEPT: |
| { |
| MuxVC *new_vc = NEW(new MuxVC); |
| Debug("muxvc", "Created new MuxVC @ 0x%x", new_vc); |
| new_vc->init_from_accept((NetVConnection *) data, call_cont); |
| break; |
| } |
| default: |
| ink_release_assert(0); |
| break; |
| } |
| return 0; |
| } |
| |
| MuxProcessor muxProcessor; |
| |
| MuxProcessor::MuxProcessor(): |
| list_mutex(NULL), mux_list() |
| { |
| } |
| |
| MuxProcessor::~MuxProcessor() |
| { |
| } |
| |
| int |
| MuxProcessor::start() |
| { |
| list_mutex = new_ProxyMutex(); |
| |
| HttpAccept *http_accept = NEW(new HttpAccept(SERVER_PORT_DEFAULT)); |
| |
| MuxAcceptor *new_accept = NEW(new MuxAcceptor); |
| new_accept->init(9444, http_accept); |
| |
| mux_pages_init(); |
| |
| return 0; |
| } |
| |
| // void MuxProcessor::find_mux_internal(unsigned int ip, int port, MuxClient** rmux) |
| // |
| // searches the existing mux list for a mux matching ip, port |
| // |
| // if a matching mux is found, calls back c with it |
| // and returns MUX_FIND_FOUND |
| // |
| // if no matching mux can be found returns |
| // MUX_FIND_NOT_FOUND |
| // |
| // if the search could not be compelted due to a lock |
| // miss, returns MUX_FIND_RETRY |
| // |
| // |
| MuxFindResult_t MuxProcessor::find_mux_internal(Continuation * c, unsigned int ip, int port) |
| { |
| |
| |
| EThread * |
| my_ethread = this_ethread(); |
| MUTEX_TRY_LOCK(list_lock, list_mutex, my_ethread); |
| |
| if (!list_lock) { |
| return MUX_FIND_RETRY; |
| } |
| MuxVC * |
| current = mux_list.head; |
| MuxVC * |
| next = NULL; |
| |
| for (; current != NULL; current = next) { |
| next = current->link.next; |
| |
| if (current->get_remote_ip() == ip && current->get_remote_port() == port) { |
| |
| MUTEX_TRY_LOCK(clock, current->mutex, my_ethread); |
| if (!clock) { |
| return MUX_FIND_RETRY; |
| } |
| |
| if ((current->connect_state == MUX_CONNECTED_ACTIVE || |
| current->connect_state == MUX_CONNECTED_IDLE) && |
| (!is_action_tag_set("mux_limit") || current->num_clients <= 10)) { |
| MuxClientVC * |
| new_client = current->new_client(); |
| Debug("mux_open", "mux_find_internal cb with 0x%x", new_client); |
| c->handleEvent(NET_EVENT_OPEN, new_client); |
| return MUX_FIND_FOUND; |
| } |
| } |
| } |
| |
| return MUX_FIND_NOT_FOUND; |
| } |
| |
| Action * |
| MuxProcessor::get_mux_re(Continuation * c, unsigned int ip, int port) |
| { |
| |
| MuxGetCont *mgc = NULL; |
| |
| Debug("mux_open", "get_mux_re called for 0x%x", c); |
| if (port == 0) { |
| port = 9444; |
| } |
| |
| MuxClientVC *new_mux = NULL; |
| |
| MuxFindResult_t r = find_mux_internal(c, ip, port); |
| |
| switch (r) { |
| case MUX_FIND_FOUND: |
| return ACTION_RESULT_DONE; |
| case MUX_FIND_NOT_FOUND: |
| mgc = NEW(new MuxGetCont); |
| return mgc->init_for_new_mux(c, ip, port); |
| case MUX_FIND_RETRY: |
| // Lock miss while searching mux list so retry |
| mgc = NEW(new MuxGetCont); |
| return mgc->init_for_lock_miss(c, ip, port); |
| default: |
| ink_release_assert(0); |
| } |
| } |
| |
| MuxGetCont::MuxGetCont(): |
| return_action(), mux_action(NULL), mux_vc(NULL), retry_event(NULL), ip(0), port(0) |
| { |
| } |
| |
| MuxGetCont::~MuxGetCont() |
| { |
| ink_debug_assert(mux_action == NULL); |
| |
| if (retry_event) { |
| retry_event->cancel(); |
| retry_event = NULL; |
| } |
| |
| *((Action *) & return_action) = NULL; |
| mutex = NULL; |
| } |
| |
| Action * |
| MuxGetCont::init_for_lock_miss(Continuation * c, unsigned int ip_arg, int port_arg) |
| { |
| this->mutex = c->mutex; |
| *((Action *) & return_action) = c; |
| |
| SET_HANDLER(MuxGetCont::lock_miss_handler); |
| |
| ip = ip_arg; |
| port = port_arg; |
| |
| retry_event = eventProcessor.schedule_in(this, HRTIME_MSECONDS(10)); |
| |
| return &return_action; |
| } |
| |
| Action * |
| MuxGetCont::init_for_new_mux(Continuation * c, unsigned int ip_arg, int port_arg) |
| { |
| this->mutex = c->mutex; |
| *((Action *) & return_action) = c; |
| |
| SET_HANDLER(MuxGetCont::new_mux_handler); |
| |
| mux_vc = NEW(new MuxVC); |
| mux_vc->init(); |
| |
| // Using take lock since it's a brand new mutex |
| ProxyMutexPtr mref = mux_vc->mutex; |
| MUTEX_TAKE_LOCK(mref, c->mutex->thread_holding); |
| Action *tmp = mux_vc->do_connect(this, ip_arg, port_arg); |
| Mutex_unlock(mref, mref->thread_holding); |
| |
| if (tmp != ACTION_RESULT_DONE) { |
| mux_action = tmp; |
| return &return_action; |
| } else { |
| return ACTION_RESULT_DONE; |
| } |
| } |
| |
| int |
| MuxGetCont::lock_miss_handler(int event, void *data) |
| { |
| |
| Event *call_event = (Event *) data; |
| |
| ink_release_assert(event == EVENT_INTERVAL); |
| ink_debug_assert(retry_event == call_event); |
| ink_debug_assert(this->mutex.m_ptr == return_action.mutex.m_ptr); |
| |
| retry_event = NULL; |
| |
| // Note, we've already got the continuation's mutex |
| // since we set our mutex to it's mutex |
| if (return_action.cancelled) { |
| ink_debug_assert(mux_action == NULL); |
| retry_event = NULL; |
| delete this; |
| return EVENT_DONE; |
| } |
| |
| MuxFindResult_t r = muxProcessor.find_mux_internal(return_action.continuation, |
| ip, port); |
| |
| switch (r) { |
| case MUX_FIND_FOUND: |
| break; |
| case MUX_FIND_NOT_FOUND:{ |
| // Could not find a mux so create a new one |
| SET_HANDLER(MuxGetCont::new_mux_handler); |
| mux_vc = NEW(new MuxVC); |
| mux_vc->init(); |
| |
| Action *tmp = mux_vc->do_connect(this, ip, port); |
| if (tmp != ACTION_RESULT_DONE) { |
| mux_action = tmp; |
| } |
| break; |
| } |
| case MUX_FIND_RETRY: |
| // Lock miss while searching mux list so retry |
| retry_event = call_event; |
| call_event->schedule_in(HRTIME_MSECONDS(10)); |
| return EVENT_DONE; |
| default: |
| ink_release_assert(0); |
| } |
| |
| return EVENT_DONE; |
| } |
| |
| |
| int |
| MuxGetCont::new_mux_handler(int event, void *data) |
| { |
| |
| mux_action = NULL; |
| |
| switch (event) { |
| case MUX_EVENT_OPEN: |
| { |
| ink_debug_assert(mux_vc->connect_state == MUX_CONNECTED_ACTIVE); |
| Debug("mux_open", "[MuxGetCont::main_handler sending] adding to mux list"); |
| |
| |
| // Fix - use try_lock & retry |
| MUTEX_TAKE_LOCK(muxProcessor.list_mutex, this_ethread()); |
| muxProcessor.mux_list.push(mux_vc); |
| mux_vc->on_mux_list = true; |
| Mutex_unlock(muxProcessor.list_mutex, muxProcessor.list_mutex->thread_holding); |
| |
| if (!return_action.cancelled) { |
| MuxClientVC *new_client = mux_vc->new_client(); |
| Debug("mux_open", "[MuxGetCont::main_handler sending] callbak with NET_EVENT_OPEN"); |
| return_action.continuation->handleEvent(NET_EVENT_OPEN, new_client); |
| } |
| break; |
| } |
| case MUX_EVENT_OPEN_FAILED: |
| { |
| Debug("mux_open", "[MuxGetCont::main_handler sending] callbak with NET_EVENT_OPEN_FAILED"); |
| return_action.continuation->handleEvent(NET_EVENT_OPEN_FAILED, NULL); |
| break; |
| } |
| default: |
| ink_release_assert(0); |
| } |
| |
| delete this; |
| return 0; |
| } |
| |
| /************************************************************* |
| * |
| * STAT PAGES STUFF |
| * |
| **************************************************************/ |
| |
| class MuxPagesHandler:public BaseStatPagesHandler |
| { |
| public: |
| MuxPagesHandler(Continuation * cont, HTTPHdr * header); |
| ~MuxPagesHandler(); |
| |
| int handle_muxvc_list(int event, void *edata); |
| int handle_callback(int event, void *edata); |
| int handle_mux_details(int event, void *data); |
| |
| Action action; |
| char *request; |
| private: |
| Arena arena; |
| int32_t extract_id(const char *query); |
| void dump_mux(MuxVC * mvc); |
| void dump_mux_client(MuxClientVC * client); |
| }; |
| |
| MuxPagesHandler::MuxPagesHandler(Continuation * cont, HTTPHdr * header) |
| :BaseStatPagesHandler(new_ProxyMutex()), request(NULL) |
| { |
| action = cont; |
| |
| URL *url; |
| int length; |
| |
| url = header->url_get(); |
| request = (char *) url->path_get(&length); |
| request = arena.str_store(request, length); |
| |
| if (strncmp(request, "mux_details", sizeof("mux_details")) == 0) { |
| arena.str_free(request); |
| request = (char *) url->query_get(&length); |
| request = arena.str_store(request, length); |
| SET_HANDLER(&MuxPagesHandler::handle_mux_details); |
| } else { |
| SET_HANDLER(&MuxPagesHandler::handle_muxvc_list); |
| } |
| } |
| |
| MuxPagesHandler::~MuxPagesHandler() |
| { |
| } |
| |
| void |
| MuxPagesHandler::dump_mux_client(MuxClientVC * client) |
| { |
| |
| resp_begin_row(); |
| |
| resp_begin_column(); |
| resp_add("%d", client->id); |
| resp_end_column(); |
| |
| // Write VIO |
| resp_begin_column(); |
| resp_add("%d, %d", client->write_state.vio.nbytes, client->write_state.vio.ndone); |
| resp_end_column(); |
| |
| resp_begin_column(); |
| resp_add("%d, %d", client->write_state.enabled, client->write_state.shutdown); |
| resp_end_column(); |
| |
| |
| // Read VIO |
| resp_begin_column(); |
| resp_add("%d, %d", client->read_state.vio.nbytes, client->read_state.vio.ndone); |
| resp_end_column(); |
| |
| resp_begin_column(); |
| resp_add("%d, %d", client->read_state.enabled, client->read_state.shutdown); |
| resp_end_column(); |
| |
| resp_begin_column(); |
| resp_add("%d", (client->read_byte_bank) ? client->byte_bank_reader->read_avail() : 0); |
| resp_end_column(); |
| |
| resp_begin_column(); |
| resp_add("%d", client->other_side_closed); |
| resp_end_column(); |
| |
| resp_end_row(); |
| } |
| |
| void |
| MuxPagesHandler::dump_mux(MuxVC * mux) |
| { |
| |
| char *foo; |
| unsigned int ip = mux->get_remote_ip(); |
| unsigned char *ip_ptr = (unsigned char *) &ip; |
| |
| UnixNetVConnection *unet_vc = (UnixNetVConnection *) mux->net_vc; |
| |
| resp_begin("Mux details"); |
| |
| resp_add("<h3> Details for MuxVC Id %d </h3>\n", mux->id); |
| |
| resp_begin_item(); |
| resp_add("Connected to: %u.%u.%u.%u:%d", ip_ptr[0], ip_ptr[1], ip_ptr[2], ip_ptr[3], mux->get_remote_port()); |
| resp_end_item(); |
| |
| if (mux->process_event) { |
| resp_begin_item(); |
| resp_add("Process Event: 0x%X", mux->process_event); |
| resp_end_item(); |
| } |
| |
| resp_begin_item(); |
| resp_add("Number of active clients: %d", mux->num_clients); |
| resp_end_item(); |
| |
| if (mux->read_vio) { |
| resp_begin_item(); |
| resp_add("Read VIO: nbytes: %d, ndone %d, bytes avail %d", |
| mux->read_vio->nbytes, mux->read_vio->ndone, |
| (mux->read_buffer_reader != NULL) ? mux->read_buffer_reader->read_avail() : 0); |
| resp_end_item(); |
| |
| resp_begin_item(); |
| resp_add("Read Net State: enabled %d", unet_vc ? unet_vc->read.enabled : -1); |
| resp_end_item(); |
| } |
| |
| if (mux->write_vio) { |
| resp_begin_item(); |
| resp_add("Write VIO: nbytes: %d, ndone %d, in buffer bytes %d blocks %d ", |
| mux->write_vio->nbytes, mux->write_vio->ndone, |
| (mux->write_vio->buffer.entry != NULL) ? |
| mux->write_vio->buffer.entry->read_avail() : 0, |
| (mux->write_vio->buffer.entry != NULL) ? mux->write_vio->buffer.entry->block_count() : 0); |
| resp_end_item(); |
| |
| resp_begin_item(); |
| resp_add("Write Net State: enabled %d", unet_vc ? unet_vc->write.enabled : -1); |
| resp_end_item(); |
| } |
| |
| resp_add("<hr>\n"); |
| resp_add("<p> <h4> Clients: </h4> </p>"); |
| |
| resp_begin_table(1, 3, 100); |
| |
| resp_begin_row(); |
| resp_begin_column(); |
| resp_add("Id"); |
| resp_end_column(); |
| resp_begin_column(); |
| resp_add("Write Nybytes, NDone"); |
| resp_end_column(); |
| resp_begin_column(); |
| resp_add("Write E S"); |
| resp_end_column(); |
| resp_begin_column(); |
| resp_add("Read Nbytes, NDone"); |
| resp_end_column(); |
| resp_begin_column(); |
| resp_add("Read E S"); |
| resp_end_column(); |
| resp_begin_column(); |
| resp_add("Byte Bank Size"); |
| resp_end_column(); |
| resp_begin_column(); |
| resp_add("Other Close"); |
| resp_end_column(); |
| resp_end_row(); |
| |
| MuxClientVC *client = mux->active_clients.head; |
| MuxClientVC *next = NULL; |
| |
| for (; client != NULL; client = next) { |
| next = client->link.next; |
| dump_mux_client(client); |
| } |
| |
| resp_end_table(); |
| |
| |
| resp_end(); |
| } |
| |
| int |
| MuxPagesHandler::handle_mux_details(int event, void *data) |
| { |
| |
| ink_debug_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL); |
| Event *call_event = (Event *) data; |
| |
| int32_t mux_id = extract_id(request); |
| |
| if (mux_id < 0) { |
| resp_begin("Mux Pages Error"); |
| resp_add("<b>Unable to extract id</b>\n"); |
| resp_end(); |
| return handle_callback(EVENT_NONE, NULL); |
| } |
| |
| MUTEX_TRY_LOCK(pLock, muxProcessor.list_mutex, call_event->ethread); |
| |
| if (!pLock) { |
| eventProcessor.schedule_in(this, HRTIME_MSECONDS(10)); |
| return EVENT_DONE; |
| } |
| |
| MuxVC *mux = muxProcessor.mux_list.head; |
| MuxVC *next = NULL; |
| |
| for (; mux != NULL; mux = next) { |
| next = mux->link.next; |
| |
| if (mux->id == mux_id) { |
| break; |
| } |
| } |
| |
| if (mux == NULL) { |
| resp_begin("Mux Pages Error"); |
| resp_add("<b>Unable to find id %d</b>\n", mux_id); |
| resp_end(); |
| return handle_callback(EVENT_NONE, NULL); |
| } |
| |
| MUTEX_TRY_LOCK(mLock, mux->mutex, call_event->ethread); |
| if (!mLock) { |
| eventProcessor.schedule_in(this, HRTIME_MSECONDS(10)); |
| return EVENT_DONE; |
| } |
| |
| dump_mux(mux); |
| handle_callback(EVENT_NONE, NULL); |
| |
| return EVENT_DONE; |
| } |
| |
| int |
| MuxPagesHandler::handle_muxvc_list(int event, void *data) |
| { |
| |
| ink_debug_assert(event == EVENT_IMMEDIATE || event == EVENT_INTERVAL); |
| Event *call_event = (Event *) data; |
| |
| MUTEX_TRY_LOCK(lock, muxProcessor.list_mutex, call_event->ethread); |
| |
| if (!lock) { |
| eventProcessor.schedule_in(this, HRTIME_MSECONDS(10)); |
| return EVENT_DONE; |
| } |
| |
| resp_begin("MuxVC List"); |
| |
| MuxVC *mux = muxProcessor.mux_list.head; |
| MuxVC *next = NULL; |
| |
| for (; mux != NULL; mux = next) { |
| next = mux->link.next; |
| |
| unsigned int ip = mux->get_remote_ip(); |
| int port = mux->get_remote_port(); |
| unsigned char *ip_ptr = (unsigned char *) &ip; |
| |
| resp_begin_item(); |
| resp_add("id: <a href=\"http://{mux}/mux_details?id=%d\"> " |
| "%d </a> | %u.%u.%u.%u:%d | %d clients\n", |
| mux->id, mux->id, ip_ptr[0], ip_ptr[1], ip_ptr[2], ip_ptr[3], port, mux->num_clients); |
| resp_end_item(); |
| } |
| |
| resp_end(); |
| handle_callback(EVENT_NONE, NULL); |
| |
| return EVENT_DONE; |
| } |
| |
| int |
| MuxPagesHandler::handle_callback(int event, void *edata) |
| { |
| MUTEX_TRY_LOCK(trylock, action.mutex, this_ethread()); |
| if (!trylock) { |
| SET_HANDLER(&MuxPagesHandler::handle_callback); |
| eventProcessor.schedule_in(this, HRTIME_MSECONDS(10)); |
| return EVENT_DONE; |
| } |
| |
| if (!action.cancelled) { |
| if (response) { |
| StatPageData data; |
| |
| data.data = response; |
| data.type = NULL; |
| data.length = response_length; |
| response = NULL; |
| |
| action.continuation->handleEvent(STAT_PAGE_SUCCESS, &data); |
| } else { |
| action.continuation->handleEvent(STAT_PAGE_FAILURE, NULL); |
| } |
| } |
| |
| delete this; |
| return EVENT_DONE; |
| } |
| |
| int32_t |
| MuxPagesHandler::extract_id(const char *query) |
| { |
| char *p; |
| int32_t id; |
| |
| p = (char *) strstr(query, "id="); |
| if (!p) { |
| return -1; |
| } |
| p += sizeof("id=") - 1; |
| |
| id = ink_atoi(p); |
| |
| // Check to see if we found the id |
| if (id == 0) { |
| if (*p == '0' && *(p + 1) == '\0') { |
| return 0; |
| } else { |
| return -1; |
| } |
| } else { |
| return id; |
| } |
| } |
| |
| static Action * |
| mux_pages_callback(Continuation * cont, HTTPHdr * header) |
| { |
| MuxPagesHandler *handler; |
| |
| handler = NEW(new MuxPagesHandler(cont, header)); |
| eventProcessor.schedule_imm(handler, ET_CALL); |
| |
| return &handler->action; |
| } |
| |
| void |
| mux_pages_init() |
| { |
| statPagesManager.register_http("mux", mux_pages_callback); |
| } |
| |
| /************************************************************* |
| * |
| * REGRESSION TEST STUFF |
| * |
| **************************************************************/ |
| |
| class MUXTestDriver:public NetTestDriver |
| { |
| public: |
| MUXTestDriver(); |
| ~MUXTestDriver(); |
| |
| void start_tests(RegressionTest * r_arg, int *pstatus_arg); |
| int main_handler(int event, void *data); |
| |
| private: |
| MuxAcceptor * regress_accept; |
| Action *pending_action; |
| |
| int i; |
| int completions_received; |
| RegressionTest *r; |
| int *pstatus; |
| |
| void start_next_test(); |
| void start_active_side(NetVConnection * p_vc); |
| void start_passive_side(NetVConnection * p_vc); |
| }; |
| |
| MUXTestDriver::MUXTestDriver(): |
| regress_accept(NULL), pending_action(NULL), i(0), completions_received(0), r(NULL), pstatus(NULL), NetTestDriver() |
| { |
| } |
| |
| MUXTestDriver::~MUXTestDriver() |
| { |
| mutex = NULL; |
| |
| if (regress_accept) { |
| delete regress_accept; |
| regress_accept = NULL; |
| } |
| |
| if (pending_action) { |
| pending_action->cancel(); |
| pending_action = NULL; |
| } |
| } |
| |
| |
| void |
| MUXTestDriver::start_tests(RegressionTest * r_arg, int *pstatus_arg) |
| { |
| mutex = new_ProxyMutex(); |
| MUTEX_TRY_LOCK(lock, mutex, this_ethread()); |
| |
| r = r_arg; |
| pstatus = pstatus_arg; |
| |
| SET_HANDLER(&MUXTestDriver::main_handler); |
| |
| regress_accept = NEW(new MuxAcceptor); |
| regress_accept->init(9555, this); |
| |
| start_next_test(); |
| } |
| |
| |
| void |
| MUXTestDriver::start_next_test() |
| { |
| |
| int next_index = i * 2; |
| if (next_index >= num_netvc_tests) { |
| // We are done - // FIX - PASS or FAIL? |
| if (errors == 0) { |
| *pstatus = REGRESSION_TEST_PASSED; |
| } else { |
| *pstatus = REGRESSION_TEST_FAILED; |
| } |
| return; |
| } |
| |
| Debug("mux_test", "Starting test %s", netvc_tests_def[next_index].test_name); |
| completions_received = 0; |
| |
| ink_debug_assert(pending_action == NULL); |
| Action *tmp = muxProcessor.get_mux_re(this, inet_addr("127.0.0.1"), 9555); |
| |
| if (tmp != ACTION_RESULT_DONE) { |
| pending_action = tmp; |
| } |
| } |
| |
| void |
| MUXTestDriver::start_active_side(NetVConnection * a_vc) |
| { |
| int a_index = i * 2; |
| |
| NetVCTest *a = NEW(new NetVCTest); |
| a->init_test(NET_VC_TEST_ACTIVE, this, a_vc, r, &netvc_tests_def[a_index], "MuxVC", "mux_test_detail"); |
| a->start_test(); |
| } |
| |
| void |
| MUXTestDriver::start_passive_side(NetVConnection * p_vc) |
| { |
| int p_index = (i * 2) + 1; |
| |
| NetVCTest *p = NEW(new NetVCTest); |
| p->init_test(NET_VC_TEST_PASSIVE, this, p_vc, r, &netvc_tests_def[p_index], "MuxVC", "mux_test_detail"); |
| p->start_test(); |
| } |
| |
| |
| int |
| MUXTestDriver::main_handler(int event, void *data) |
| { |
| |
| Debug("mux_test_detail", "MUXTestDriver::main_handler recevied event %d", event); |
| |
| switch (event) { |
| case NET_EVENT_OPEN: |
| { |
| // We opened test mux vc so start testing |
| pending_action = NULL; |
| start_active_side((NetVConnection *) data); |
| break; |
| } |
| case NET_EVENT_OPEN_FAILED: |
| { |
| // Open of the test vc failed so give up |
| pending_action = NULL; |
| Warning("mux regression failed - could not open localhost muxvc"); |
| *pstatus = REGRESSION_TEST_FAILED; |
| delete this; |
| break; |
| } |
| case NET_EVENT_ACCEPT: |
| { |
| // New test client |
| start_passive_side((NetVConnection *) data); |
| break; |
| } |
| case EVENT_IMMEDIATE: |
| { |
| // Signifies a completion of one side of the test |
| completions_received++; |
| |
| if (completions_received == 2) { |
| i++; |
| start_next_test(); |
| } |
| break; |
| } |
| } |
| |
| return 0; |
| } |
| |
| REGRESSION_TEST(MUX) (RegressionTest * t, int atype, int *pstatus) { |
| MUXTestDriver *driver = NEW(new MUXTestDriver); |
| driver->start_tests(t, pstatus); |
| } |