| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "ink_unused.h" /* MAGIC_EDITING_TAG */ |
| |
| |
| /**************************************************************************** |
| |
| ICP.cc |
| |
| |
| ****************************************************************************/ |
| |
| #include "libts.h" |
| #include "Main.h" |
| #include "P_EventSystem.h" |
| #include "P_Cache.h" |
| #include "P_Net.h" |
| #include "MgmtUtils.h" |
| #include "P_RecProcess.h" |
| #include "ICP.h" |
| #include "ICPProcessor.h" |
| #include "ICPlog.h" |
| #include "logging/Log.h" |
| #include "logging/LogAccessICP.h" |
| #include "BaseManager.h" |
| #include "HdrUtils.h" |
| |
| extern CacheLookupHttpConfig global_cache_lookup_config; |
| HTTPHdr gclient_request; |
| |
| //**************************************************************************** |
| // File Overview: |
| // ============== |
| // ICP files |
| // ICP.h -- All ICP class definitions. |
| // ICPlog.h -- ICP log object for logging system |
| // ICP.cc -- Incoming/outgoing ICP request and ICP configuration |
| // data base management. |
| // ICPConfig.cc -- ICP interface to Traffic Server configuration |
| // management, member functions for ICPlog (object |
| // passed to logging system) along with |
| // miscellaneous support routines. |
| // ICPevents.h -- Event definitions specific to ICP. |
| // ICPProcessor.h -- ICP external interface for other subsystems. |
| // External subsystems only need to include this |
| // header to use ICP. |
| // ICPProcessor.cc -- ICP external interface implementation. |
| // ICPStats.cc -- ICP statistic callback registration. |
| // |
| // |
| // Class Overview: |
| // =============== |
| // ICPConfigData -- Manages global ICP data from the TS configuration |
| // manager. |
| // PeerConfigData -- Manages ICP peer data from the TS configuration |
| // manager. |
| // ICPConfigUpdateCont -- Used by |
| // ICPConfiguration::icp_config_change_callback() |
| // to retry callout after a delay in cases where |
| // we cannot acquire the configuration lock. |
| // ICPConfiguration -- Overall manager of ICP configuration from TS |
| // configuration. Acts as interface and uses |
| // ICPConfigData and PeerConfigData to implement |
| // actions. Also fields/processes TS configuration |
| // callouts for "icp.config" changes. ICP classes only |
| // see ICPConfiguration when dealing with TS |
| // configuration info. |
| // |
| // Peer (base class) -- abstract base class |
| // ParentSiblingPeer : Peer -- ICP object describing parent/sibling |
| // peer which is initialized from the |
| // TS configuration data. |
| // MultiCastPeer : Peer -- ICP object describing MultiCast peer. |
| // Object is initialized from the TS |
| // configuration data. |
| // |
| // BitMap -- Generic bit map management class |
| // |
| // ICPProcessor -- Central class which starts all periodic events |
| // and maintains ICP configuration database. Delegates |
| // incoming data processing to ICPHandlerCont and |
| // outgoing data processing to ICPRequestCont. Implements |
| // reconfiguration actions and query requests from the |
| // external interface. |
| // |
| // ICPRequestCont -- Implements the state machine which processes |
| // locally generated ICP queries. Generates message |
| // queries and processes query responses. Responses |
| // received via callout from ICPPeerReadCont. |
| // |
| // PeriodicCont (base class) -- abstract base class |
| // ICPPeriodicCont : PeriodicCont -- Periodic which looks for ICP |
| // configuration changes sent by the Traffic Server |
| // configuration manager, and initiates ICP reconfiguration |
| // in the event we have a valid configuration change via |
| // ICPProcessor::ReconfigureStateMachine(). |
| // |
| // ICPHandlerCont : PeriodicCont -- Periodic which monitors incoming |
| // ICP sockets and starts processing of the incoming ICP data. |
| // |
| // ICPPeerReadCont -- Implements the incoming data state machine. |
| // Processes remote ICP query requests and passes query |
| // responses to ICPRequestCont via a callout. |
| // ICPlog -- Logging object which encapsulates ICP query info required |
| // by the new logging subsystem to produce squid access log |
| // data for ICP queries. |
| // |
| //**************************************************************************** |
| // |
| // ICP is integrated into HTTP miss processing as follows. |
| // |
| // if (HTTP Traffic Server Miss) { |
| // if (proxy.config.icp.enabled) { |
| // Status = QueryICP(URL, &target_ip); |
| // if (Status == ICP_HIT) |
| // Issue Http Request to (target_ip, proxy_port); |
| // } |
| // if (proxy.config.http.parent_proxy_routing_enable) { |
| // Issue Http Request to (proxy.config.http.parent_proxy_hostname, |
| // proxy.config.http.parent_proxy_port) |
| // } |
| // else |
| // Issue Http Request to Origin Server |
| // } |
| // |
| //**************************************************************************** |
| |
| // VC++ 5.0 is rather picky |
| typedef int (ICPPeerReadCont::*ICPPeerReadContHandler) (int, void *); |
| typedef int (ICPPeriodicCont::*ICPPeriodicContHandler) (int, void *); |
| typedef int (ICPHandlerCont::*ICPHandlerContHandler) (int, void *); |
| typedef int (ICPRequestCont::*ICPRequestContHandler) (int, void *); |
| |
| // Plugin freshness function |
| PluginFreshnessCalcFunc pluginFreshnessCalcFunc = (PluginFreshnessCalcFunc) NULL; |
| |
| //--------------------------------------- |
| // Class ICPHandlerCont member functions |
| // Deal with incoming ICP data |
| //--------------------------------------- |
| |
| // Static data declarations |
| //Allocator *ICPHandlerCont::IncomingICPDataBuf; |
| int64_t ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex; |
| static ClassAllocator <ICPPeerReadCont::PeerReadData>PeerReadDataAllocator("PeerReadDataAllocator"); |
| static ClassAllocator<ICPPeerReadCont> ICPPeerReadContAllocator("ICPPeerReadContAllocator"); |
| |
| static Action *default_action = NULL; |
| |
| |
| ICPHandlerCont::ICPHandlerCont(ICPProcessor * icpP) |
| : PeriodicCont(icpP) |
| { |
| } |
| |
| // do nothing continuation handler |
| int |
| ICPHandlerCont::TossEvent(int event, Event * e) |
| { |
| NOWARN_UNUSED(event); |
| NOWARN_UNUSED(e); |
| return EVENT_DONE; |
| } |
| |
| int |
| ICPHandlerCont::PeriodicEvent(int event, Event * e) |
| { |
| NOWARN_UNUSED(event); |
| NOWARN_UNUSED(e); |
| int n_peer, valid_peers; |
| Peer *P; |
| |
| // Periodic handler which initiates incoming message processing |
| // on the defined peers. |
| |
| valid_peers = _ICPpr->GetRecvPeers(); |
| |
| // get peer info from the completionEvent token. |
| switch (event) { |
| case EVENT_POLL: |
| case EVENT_INTERVAL: |
| { |
| // start read I/Os on peers which don't have outstanding I/Os |
| for (n_peer = 0; n_peer < valid_peers; ++n_peer) { |
| P = _ICPpr->GetNthRecvPeer(n_peer, _ICPpr->GetLastRecvPeerBias()); |
| if (!P || (P && !P->IsOnline())) |
| continue; |
| if (P->shouldStartRead()) { |
| P->startingRead(); |
| /////////////////////////////////////////// |
| // Setup state machine |
| /////////////////////////////////////////// |
| ICPPeerReadCont *s = ICPPeerReadContAllocator.alloc(); |
| int local_lookup = _ICPpr->GetConfig()->globalConfig()->ICPLocalCacheLookup(); |
| |
| s->init(_ICPpr, P, local_lookup); |
| RECORD_ICP_STATE_CHANGE(s, event, ICPPeerReadCont::READ_ACTIVE); |
| |
| /////////////////////////////////////////// |
| // Start processing |
| /////////////////////////////////////////// |
| s->handleEvent(EVENT_INTERVAL, (Event *) 0); |
| } |
| } |
| break; |
| } |
| default: |
| { |
| ink_release_assert(!"unexpected event"); |
| break; |
| } |
| } // End of switch |
| return EVENT_CONT; |
| } |
| |
| //*************************************************************************** |
| // Nested Class PeerReadData member functions |
| // Used by ICPPeerReadCont to encapsulate the data required by |
| // PeerReadStateMachine |
| //*************************************************************************** |
| ICPPeerReadCont::PeerReadData::PeerReadData() |
| { |
| init(); |
| } |
| |
| void |
| ICPPeerReadCont::PeerReadData::init() |
| { |
| _start_time = 0; |
| _mycont = 0; |
| _peer = 0; |
| _next_state = READ_ACTIVE; |
| _cache_lookup_local = 0; |
| _buf = 0; |
| _rICPmsg = 0; |
| _rICPmsg_len = 0; |
| _cachelookupURL.clear(); |
| _queryResult = 0; |
| _ICPReqCont = 0; |
| _bytesReceived = 0; |
| #ifdef DEBUG_ICP |
| _nhistory = 0; |
| #endif |
| memset((void *) &_sender, 0, sizeof(_sender)); |
| } |
| |
| ICPPeerReadCont::PeerReadData::~PeerReadData() |
| { |
| reset(1); |
| } |
| |
| void |
| ICPPeerReadCont::PeerReadData::reset(int full_reset) |
| { |
| if (full_reset) { |
| _peer = 0; |
| _buf = 0; |
| } |
| if (_rICPmsg) { |
| _rICPmsg = 0; |
| _rICPmsg_len = 0; |
| } |
| |
| if (_cachelookupURL.valid()) { |
| _cachelookupURL.destroy(); |
| } |
| } |
| |
| //*************************************************************************** |
| |
| //------------------------------------------------------------------------ |
| // ICPPeerReadCont -- ICP incoming message processing state machine |
| //------------------------------------------------------------------------ |
| ICPPeerReadCont::ICPPeerReadCont():Continuation(0), _object_vc(NULL), _object_read(NULL), |
| _cache_req_hdr_heap_handle(NULL), _cache_resp_hdr_heap_handle(NULL), _ICPpr(NULL), _state(NULL), |
| _start_time(0), _recursion_depth(0) |
| { |
| } |
| |
| void |
| ICPPeerReadCont::init(ICPProcessor * ICPpr, Peer * p, int lookup_local) |
| { |
| PeerReadData *s = PeerReadDataAllocator.alloc(); |
| s->init(); |
| s->_start_time = ink_get_hrtime(); |
| s->_peer = p; |
| s->_next_state = READ_ACTIVE; |
| s->_cache_lookup_local = lookup_local; |
| SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent); |
| _ICPpr = ICPpr; |
| _state = s; |
| _recursion_depth = -1; |
| _object_vc = NULL; |
| _object_read = NULL; |
| _cache_req_hdr_heap_handle = NULL; |
| _cache_resp_hdr_heap_handle = NULL; |
| mutex = new_ProxyMutex(); |
| } |
| |
| ICPPeerReadCont::~ICPPeerReadCont() |
| { |
| reset(1); // Full reset |
| } |
| |
| void |
| ICPPeerReadCont::reset(int full_reset) |
| { |
| mutex = 0; |
| if (this->_state) { |
| this->_state->reset(full_reset); |
| PeerReadDataAllocator.free(this->_state); |
| } |
| if (_cache_req_hdr_heap_handle) { |
| ats_free(_cache_req_hdr_heap_handle); |
| _cache_req_hdr_heap_handle = NULL; |
| } |
| if (_cache_resp_hdr_heap_handle) { |
| ats_free(_cache_resp_hdr_heap_handle); |
| _cache_resp_hdr_heap_handle = NULL; |
| } |
| } |
| |
| int |
| ICPPeerReadCont::ICPPeerReadEvent(int event, Event * e) |
| { |
| switch (event) { |
| case EVENT_INTERVAL: |
| case EVENT_IMMEDIATE: |
| { |
| break; |
| } |
| case NET_EVENT_DATAGRAM_WRITE_COMPLETE: |
| case NET_EVENT_DATAGRAM_READ_COMPLETE: |
| case NET_EVENT_DATAGRAM_READ_ERROR: |
| case NET_EVENT_DATAGRAM_WRITE_ERROR: |
| { |
| ink_assert((event != NET_EVENT_DATAGRAM_READ_COMPLETE) |
| || (_state->_next_state == READ_DATA_DONE)); |
| ink_assert((event != NET_EVENT_DATAGRAM_WRITE_COMPLETE) |
| || (_state->_next_state == WRITE_DONE)); |
| |
| ink_release_assert(this == (ICPPeerReadCont *) |
| completionUtil::getHandle(e)); |
| break; |
| } |
| case CACHE_EVENT_LOOKUP_FAILED: |
| case CACHE_EVENT_LOOKUP: |
| { |
| ink_assert(_state->_next_state == AWAITING_CACHE_LOOKUP_RESPONSE); |
| break; |
| } |
| default: |
| { |
| ink_release_assert(!"unexpected event"); |
| } |
| } // End of switch |
| |
| // Front end to PeerReadStateMachine(), invoked by Event subsystem. |
| if (PeerReadStateMachine(_state, e) == EVENT_CONT) { |
| eventProcessor.schedule_in(this, RETRY_INTERVAL, ET_ICP); |
| return EVENT_DONE; |
| |
| } else if (_state->_next_state == READ_PROCESSING_COMPLETE) { |
| _state->_peer->cancelRead(); |
| this->reset(1); // Full reset |
| ICPPeerReadContAllocator.free(this); |
| return EVENT_DONE; |
| |
| } else { |
| return EVENT_DONE; |
| } |
| } |
| |
| int |
| ICPPeerReadCont::StaleCheck(int event, Event * e) |
| { |
| ip_port_text_buffer ipb; |
| NOWARN_UNUSED(e); |
| ink_release_assert(mutex->thread_holding == this_ethread()); |
| |
| Debug("icp-stale", "Stale check res=%d for id=%d, [%s] from [%s]", |
| event, _state->_rICPmsg->h.requestno, |
| _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb))); |
| |
| switch (event) { |
| case ICP_STALE_OBJECT: |
| { |
| _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED; |
| break; |
| } |
| case ICP_FRESH_OBJECT: |
| { |
| _state->_queryResult = CACHE_EVENT_LOOKUP; |
| break; |
| } |
| default: |
| { |
| Debug("icp-stale", "ICPPeerReadCont::StaleCheck: Invalid Event %d\n", event); |
| _state->_queryResult = CACHE_EVENT_LOOKUP_FAILED; |
| break; |
| } |
| } |
| _object_vc->do_io(VIO::CLOSE); |
| _object_vc = 0; |
| SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent); |
| return handleEvent(_state->_queryResult, 0); |
| } |
| |
| int |
| ICPPeerReadCont::ICPPeerQueryEvent(int event, Event * e) |
| { |
| ip_port_text_buffer ipb; |
| NOWARN_UNUSED(e); |
| Debug("icp", "Remote Query lookup res=%d for id=%d, [%s] from [%s]", |
| event, _state->_rICPmsg->h.requestno, |
| _state->_rICPmsg->un.query.URL, ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb))); |
| if (pluginFreshnessCalcFunc) { |
| switch (event) { |
| case CACHE_EVENT_OPEN_READ: |
| { |
| _object_vc = (CacheVConnection *) e; |
| SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::StaleCheck); |
| _object_vc->get_http_info(&_object_read); |
| (*pluginFreshnessCalcFunc) ((void *) this); |
| return EVENT_DONE; |
| } |
| case CACHE_EVENT_OPEN_READ_FAILED: |
| { |
| event = CACHE_EVENT_LOOKUP_FAILED; |
| break; |
| } |
| default: |
| break; |
| } |
| } |
| // Process result |
| _state->_queryResult = event; |
| SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerReadEvent); |
| return handleEvent(event, e); |
| } |
| |
| int |
| ICPPeerReadCont::ICPPeerQueryCont(int event, Event * e) |
| { |
| ip_port_text_buffer ipb; |
| NOWARN_UNUSED(event); |
| NOWARN_UNUSED(e); |
| |
| Action *a; |
| |
| // Perform lookup()/open_read() on behalf of PeerReadStateMachine() |
| |
| ((char *) _state->_rICPmsg)[MAX_ICP_MSGSIZE - 1] = 0; // null terminate |
| _state->_cachelookupURL.create(NULL); |
| const char *qurl = (const char *) _state->_rICPmsg->un.query.URL; |
| _state->_cachelookupURL.parse(qurl, strlen(qurl)); |
| Debug("icp", "Remote Query for id=%d, [%s] from [%s]", |
| _state->_rICPmsg->h.requestno, |
| _state->_rICPmsg->un.query.URL, |
| ats_ip_nptop(&_state->_sender, ipb, sizeof(ipb)) |
| ); |
| |
| SET_HANDLER((ICPPeerReadContHandler) & ICPPeerReadCont::ICPPeerQueryEvent); |
| if (_state->_rICPmsg->un.query.URL && *_state->_rICPmsg->un.query.URL) { |
| _state->_queryResult = ~CACHE_EVENT_LOOKUP_FAILED; |
| _start_time = ink_get_hrtime(); |
| if (pluginFreshnessCalcFunc && _ICPpr->GetConfig()->globalConfig()->ICPStaleLookup()) { |
| ////////////////////////////////////////////////////////////// |
| // Note: _cache_lookup_local is ignored in this case, since |
| // cache clustering is not used with stale lookup. |
| ////////////////////////////////////////////////////////////// |
| a = cacheProcessor.open_read(this, &_state->_cachelookupURL, |
| &gclient_request, &global_cache_lookup_config, (time_t) 0); |
| } else { |
| a = cacheProcessor.lookup(this, &_state->_cachelookupURL, _state->_cache_lookup_local); |
| } |
| if (!a) { |
| a = ACTION_IO_ERROR; |
| } |
| if (a == ACTION_RESULT_DONE) { |
| return EVENT_DONE; // callback complete |
| } else if (a == ACTION_IO_ERROR) { |
| handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0); |
| return EVENT_DONE; // callback complete |
| } else { |
| return EVENT_CONT; // callback pending |
| } |
| } else { |
| // Null URL, return failed lookup |
| handleEvent(CACHE_EVENT_LOOKUP_FAILED, 0); |
| return EVENT_DONE; // callback done |
| } |
| } |
| |
| struct AutoReference |
| { |
| AutoReference(int *cnt) |
| { |
| _cnt = cnt; |
| (*_cnt)++; |
| } |
| ~AutoReference() |
| { |
| (*_cnt)--; |
| } |
| int *_cnt; |
| }; |
| |
| int |
| ICPPeerReadCont::PeerReadStateMachine(PeerReadData * s, Event * e) |
| { |
| AutoReference l(&_recursion_depth); |
| ip_port_text_buffer ipb; // scratch buffer for diagnostic messages. |
| //----------------------------------------------------------- |
| // State machine to process ICP data received on UDP socket |
| //----------------------------------------------------------- |
| MUTEX_TRY_LOCK(lock, this->mutex, this_ethread()); |
| if (!lock) { |
| // we didn't get the lock, so we don't need to unlock it |
| // coverity[missing_unlock] |
| return EVENT_CONT; // try again later |
| } |
| |
| while (1) { // loop forever |
| |
| switch (s->_next_state) { |
| case READ_ACTIVE: |
| { |
| ink_release_assert(_recursion_depth == 0); |
| if (!_ICPpr->Lock()) |
| return EVENT_CONT; // unable to get lock, try again later |
| |
| bool valid_peer = (_ICPpr->IdToPeer(s->_peer->GetPeerID()) == s->_peer); |
| |
| if (valid_peer && _ICPpr->AllowICPQueries() |
| && _ICPpr->GetConfig()->globalConfig()->ICPconfigured()) { |
| |
| // Note pending incoming ICP request or response |
| _ICPpr->IncPendingQuery(); |
| _ICPpr->Unlock(); |
| |
| s->_next_state = READ_DATA; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA); |
| break; // move to next_state |
| |
| } else { |
| _ICPpr->Unlock(); |
| |
| // ICP NOT enabled, do nothing |
| s->_next_state = READ_PROCESSING_COMPLETE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_PROCESSING_COMPLETE); |
| return EVENT_DONE; |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_read_active: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case READ_DATA: |
| { |
| ink_release_assert(_recursion_depth == 0); |
| |
| // Assumption of one outstanding read per peer... |
| // Setup read from FD |
| ink_assert(s->_peer->buf == NULL); |
| Ptr<IOBufferBlock> buf = s->_peer->buf = new_IOBufferBlock(); |
| buf->alloc(ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex); |
| s->_peer->fromaddrlen = sizeof(s->_peer->fromaddr); |
| buf->fill(sizeof(ICPMsg_t)); // reserve space for decoding |
| char *be = buf->buf_end() - 1; |
| be[0] = 0; // null terminate buffer |
| s->_next_state = READ_DATA_DONE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_DATA_DONE); |
| ink_assert(s->_peer->readAction == NULL); |
| Action *a = s->_peer->RecvFrom_re(this, this, buf, |
| buf->write_avail() - 1, |
| &s->_peer->fromaddr.sa, |
| &s->_peer->fromaddrlen); |
| if (!a) { |
| a = ACTION_IO_ERROR; |
| } |
| if (a == ACTION_RESULT_DONE) { |
| // we will have been called back already and our state updated |
| // appropriately. |
| // move to next state |
| ink_assert(s->_next_state == PROCESS_READ_DATA); |
| break; |
| } else if (a == ACTION_IO_ERROR) { |
| // actually, this *could* be taken care of by the main handler, but |
| // error processing makes more sense at this point. Therefore, |
| // the main handler ignores the errors. |
| // |
| // No data, terminate read loop. |
| // |
| ICP_INCREMENT_DYN_STAT(no_data_read_stat); |
| s->_peer->buf = NULL; // release reference |
| s->_next_state = READ_NOT_ACTIVE_EXIT; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT); |
| // move to next state |
| break; |
| } else { |
| s->_peer->readAction = a; |
| return EVENT_DONE; |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_read_data: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case READ_DATA_DONE: |
| { |
| // Convert ICP message from network to host format |
| if (s->_peer->readAction != NULL) { |
| ink_assert(s->_peer->readAction == e); |
| s->_peer->readAction = NULL; |
| } |
| s->_bytesReceived = completionUtil::getBytesTransferred(e); |
| |
| if (s->_bytesReceived >= 0) { |
| s->_next_state = PROCESS_READ_DATA; |
| RECORD_ICP_STATE_CHANGE(s, 0, PROCESS_READ_DATA); |
| } else { |
| ICP_INCREMENT_DYN_STAT(no_data_read_stat); |
| s->_peer->buf = NULL; // release reference |
| s->_next_state = READ_NOT_ACTIVE_EXIT; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE_EXIT); |
| } |
| if (_recursion_depth > 0) { |
| return EVENT_DONE; |
| } else { |
| break; |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_read_data_done: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case PROCESS_READ_DATA: |
| case ADD_PEER: |
| { |
| ink_release_assert(_recursion_depth == 0); |
| |
| Ptr<IOBufferBlock> bufblock = s->_peer->buf; |
| char *buf = bufblock->start(); |
| |
| if (s->_next_state == PROCESS_READ_DATA) { |
| ICPRequestCont::NetToHostICPMsg((ICPMsg_t *) |
| (buf + sizeof(ICPMsg_t)), (ICPMsg_t *) buf); |
| |
| // adjust buffer pointers to point to decoded message. |
| bufblock->reset(); |
| bufblock->fill(s->_bytesReceived); |
| |
| // Validate message length for sanity |
| if (s->_bytesReceived < ((ICPMsg_t *) buf)->h.msglen) { |
| // |
| // Short read, terminate |
| // |
| ICP_INCREMENT_DYN_STAT(short_read_stat); |
| s->_peer->buf = NULL; |
| s->_next_state = READ_NOT_ACTIVE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE); |
| break; // move to next_state |
| } |
| } |
| // Validate receiver and convert the received sockaddr |
| // to internal sockaddr format. |
| IpEndpoint from; |
| if (!s->_peer->ExtToIntRecvSockAddr(&s->_peer->fromaddr.sa, &from.sa)) { |
| int status; |
| ICPConfigData *cfg = _ICPpr->GetConfig()->globalConfig(); |
| ICPMsg_t *ICPmsg = (ICPMsg_t *) buf; |
| |
| if ((cfg->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) && |
| cfg->ICPReplyToUnknownPeer() && |
| ((ICPmsg->h.version == ICP_VERSION_2) || |
| (ICPmsg->h.version == ICP_VERSION_3)) && (ICPmsg->h.opcode == ICP_OP_QUERY)) { |
| |
| // |
| // Add the unknown Peer to our database to |
| // allow us to resolve the lookup request. |
| // |
| if (!_ICPpr->GetConfig()->Lock()) { |
| s->_next_state = ADD_PEER; |
| RECORD_ICP_STATE_CHANGE(s, 0, ADD_PEER); |
| return EVENT_CONT; |
| } |
| if (!_ICPpr->GetFreePeers() || !_ICPpr->GetFreeSendPeers()) { |
| Warning("ICP Peer limit exceeded"); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP Peer limit exceeded"); |
| _ICPpr->GetConfig()->Unlock(); |
| goto invalid_message; |
| } |
| |
| int icp_reply_port = cfg->ICPDefaultReplyPort(); |
| if (!icp_reply_port) { |
| icp_reply_port = ntohs(ats_ip_port_cast(&s->_peer->fromaddr)); |
| } |
| PeerConfigData *Pcfg = NEW(new PeerConfigData( |
| PeerConfigData::CTYPE_SIBLING, |
| IpAddr(s->_peer->fromaddr), |
| 0, |
| icp_reply_port |
| )); |
| ParentSiblingPeer *P = NEW(new ParentSiblingPeer(PEER_SIBLING, Pcfg, _ICPpr, true)); |
| status = _ICPpr->AddPeer(P); |
| ink_release_assert(status); |
| status = _ICPpr->AddPeerToSendList(P); |
| ink_release_assert(status); |
| |
| P->GetChan()->setRemote(P->GetIP()); |
| |
| // coverity[uninit_use_in_call] |
| Note("ICP Peer added ip=%s", ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb))); |
| from = s->_peer->fromaddr; |
| } else { |
| invalid_message: |
| // |
| // Sender does not exist in ICP configuration, terminate |
| // |
| ICP_INCREMENT_DYN_STAT(invalid_sender_stat); |
| Debug("icp", "Received msg from invalid sender [%s]", |
| ats_ip_nptop(&s->_peer->fromaddr, ipb, sizeof(ipb))); |
| |
| s->_peer->buf = NULL; |
| s->_next_state = READ_NOT_ACTIVE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE); |
| break; // move to next_state |
| } |
| } |
| // we hand off the decoded buffer from the Peer to the PeerReadData |
| s->_sender = from; |
| s->_rICPmsg_len = s->_bytesReceived; |
| ink_assert(s->_buf == NULL); |
| s->_buf = s->_peer->buf; |
| s->_rICPmsg = (ICPMsg_t *) s->_buf->start(); |
| s->_peer->buf = NULL; |
| |
| // |
| // Handle only ICP_VERSION_2/3 messages. Reject all others. |
| // |
| if ((s->_rICPmsg->h.version != ICP_VERSION_2) |
| && (s->_rICPmsg->h.version != ICP_VERSION_3)) { |
| ICP_INCREMENT_DYN_STAT(read_not_v2_icp_stat); |
| Debug("icp", "Received (v=%d) !v2 && !v3 msg from sender [%s]", |
| (uint32_t) s->_rICPmsg->h.version, ats_ip_nptop(&from, ipb, sizeof(ipb))); |
| |
| s->_rICPmsg = NULL; |
| s->_buf = NULL; |
| s->_next_state = READ_NOT_ACTIVE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE); |
| break; // move to next_state |
| } |
| // |
| // If this is a query message, redirect to |
| // the query specific handlers. |
| // |
| if (s->_rICPmsg->h.opcode == ICP_OP_QUERY) { |
| ICP_INCREMENT_DYN_STAT(icp_remote_query_requests_stat); |
| ink_assert(!s->_mycont); |
| s->_next_state = AWAITING_CACHE_LOOKUP_RESPONSE; |
| RECORD_ICP_STATE_CHANGE(s, 0, AWAITING_CACHE_LOOKUP_RESPONSE); |
| |
| if (ICPPeerQueryCont(0, (Event *) 0) == EVENT_DONE) { |
| break; // Callback complete |
| } else { |
| return EVENT_DONE; // Callback pending |
| } |
| } else { |
| // We have a response message for an ICP query. |
| Debug("icp", "Response for Id=%d, from [%s]", |
| s->_rICPmsg->h.requestno, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb))); |
| ICP_INCREMENT_DYN_STAT(icp_remote_responses_stat); |
| s->_next_state = GET_ICP_REQUEST; |
| RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST); |
| break; // move to next_state |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_process_data_read: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case AWAITING_CACHE_LOOKUP_RESPONSE: |
| { |
| int status = 0; |
| void *data = s->_rICPmsg->un.query.URL; |
| int datalen = strlen((const char *) data) + 1; |
| |
| if (s->_queryResult == CACHE_EVENT_LOOKUP) { |
| // Use the received ICP data buffer for the response message |
| Debug("icp", "Sending ICP_OP_HIT for id=%d, [%.*s] to [%s]", |
| s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb))); |
| ICP_INCREMENT_DYN_STAT(icp_cache_lookup_success_stat); |
| status = ICPRequestCont::BuildICPMsg(ICP_OP_HIT, |
| s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ , |
| 0 /* shostid */ , |
| data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg); |
| } else if (s->_queryResult == CACHE_EVENT_LOOKUP_FAILED) { |
| // Use the received ICP data buffer for response message |
| Debug("icp", "Sending ICP_OP_MISS for id=%d, [%.*s] to [%s]", |
| s->_rICPmsg->h.requestno, datalen, (const char *)data, ats_ip_nptop(&s->_sender, ipb, sizeof(ipb))); |
| ICP_INCREMENT_DYN_STAT(icp_cache_lookup_fail_stat); |
| status = ICPRequestCont::BuildICPMsg(ICP_OP_MISS, |
| s->_rICPmsg->h.requestno, 0 /* optflags */ , 0 /* optdata */ , |
| 0 /* shostid */ , |
| data, datalen, &s->_mhdr, s->_iov, s->_rICPmsg); |
| } else { |
| Warning("Bad cache lookup event: %d", s->_queryResult); |
| ink_release_assert(!"Invalid cache lookup event"); |
| } |
| ink_assert(status == 0); |
| |
| // Make system log entry for ICP query |
| ICPlog logentry(s); |
| LogAccessICP accessor(&logentry); |
| Log::access(&accessor); |
| |
| s->_next_state = SEND_REPLY; |
| RECORD_ICP_STATE_CHANGE(s, 0, SEND_REPLY); |
| |
| if (_recursion_depth > 0) { |
| return EVENT_DONE; |
| } else { |
| break; |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_awaiting_cache_lookup_response: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case SEND_REPLY: |
| { |
| ink_release_assert(_recursion_depth == 0); |
| // |
| // Send the query response back to the sender |
| // |
| s->_next_state = WRITE_DONE; |
| RECORD_ICP_STATE_CHANGE(s, 0, WRITE_DONE); |
| ink_assert(s->_peer->writeAction == NULL); |
| Action *a = s->_peer->SendMsg_re(this, this, |
| &s->_mhdr, &s->_sender.sa); |
| if (!a) { |
| a = ACTION_IO_ERROR; |
| } |
| if (a == ACTION_RESULT_DONE) { |
| // we have been called back already and our state updated |
| // appropriately |
| break; |
| |
| } else if (a == ACTION_IO_ERROR) { |
| // Partial write. |
| ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat); |
| // coverity[uninit_use_in_call] |
| Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s", |
| ntohs(s->_rICPmsg->h.msglen), -1, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb))); |
| s->_next_state = READ_NOT_ACTIVE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE); |
| break; |
| } else { |
| s->_peer->writeAction = a; |
| return EVENT_DONE; |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_send_reply: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case WRITE_DONE: |
| { |
| s->_peer->writeAction = NULL; |
| int len = completionUtil::getBytesTransferred(e); |
| |
| if (len == (int)ntohs(s->_rICPmsg->h.msglen)) { |
| ICP_INCREMENT_DYN_STAT(query_response_write_stat); |
| s->_peer->LogSendMsg(s->_rICPmsg, &s->_sender.sa); // log query reply |
| } else { |
| // Partial write. |
| ICP_INCREMENT_DYN_STAT(query_response_partial_write_stat); |
| // coverity[uninit_use_in_call] |
| Debug("icp_warn", "ICP response send, sent=%d res=%d, ip=%s", |
| ntohs(s->_rICPmsg->h.msglen), len, ats_ip_ntop(&s->_sender, ipb, sizeof(ipb))); |
| } |
| // Processing complete, perform completion actions |
| s->_next_state = READ_NOT_ACTIVE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE); |
| Debug("icp", "state->READ_NOT_ACTIVE"); |
| |
| if (_recursion_depth > 0) { |
| return EVENT_DONE; |
| } else { |
| break; // move to next_state |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_write_done: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case GET_ICP_REQUEST: |
| { |
| ink_release_assert(_recursion_depth == 0); |
| ink_assert(s->_rICPmsg && s->_rICPmsg_len); // Sanity check |
| |
| // Get ICP request associated with response message |
| s->_ICPReqCont = ICPRequestCont::FindICPRequest(s->_rICPmsg->h.requestno); |
| if (s->_ICPReqCont) { |
| s->_next_state = GET_ICP_REQUEST_MUTEX; |
| RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST_MUTEX); |
| break; // move to next_state |
| } |
| // |
| // No ICP request for response message, log as "response |
| // for non-existent ICP request" and terminate processing |
| // |
| Debug("icp", "No ICP Request for Id=%d", s->_rICPmsg->h.requestno); |
| ICP_INCREMENT_DYN_STAT(no_icp_request_for_response_stat); |
| Peer *p = _ICPpr->FindPeer(s->_sender); |
| p->LogRecvMsg(s->_rICPmsg, 0); |
| s->_next_state = READ_NOT_ACTIVE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE); |
| break; // move to next_state |
| } |
| #if !defined(__GNUC__) |
| _end_case_get_icp_request: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case GET_ICP_REQUEST_MUTEX: |
| { |
| ink_release_assert(_recursion_depth == 0); |
| ink_assert(s->_ICPReqCont); |
| Ptr<ProxyMutex> ICPReqContMutex(s->_ICPReqCont->mutex); |
| EThread *ethread = this_ethread(); |
| ink_hrtime request_start_time; |
| |
| if (!MUTEX_TAKE_TRY_LOCK(ICPReqContMutex, ethread)) { |
| ICP_INCREMENT_DYN_STAT(icp_response_request_nolock_stat); |
| // |
| // Unable to get ICP request mutex, delay and move back |
| // to the GET_ICP_REQUEST state. We need to do this |
| // since the ICP request may be deallocated by the active |
| // continuation. |
| // |
| s->_ICPReqCont = (ICPRequestCont *) 0; |
| s->_next_state = GET_ICP_REQUEST; |
| RECORD_ICP_STATE_CHANGE(s, 0, GET_ICP_REQUEST); |
| return EVENT_CONT; |
| } |
| // Log as "response for ICP request" |
| Peer *p = _ICPpr->FindPeer(s->_sender); |
| p->LogRecvMsg(s->_rICPmsg, 1); |
| |
| // Process the ICP response for the given ICP request |
| ICPRequestCont::ICPRequestEventArgs_t args; |
| args.rICPmsg = s->_rICPmsg; |
| args.rICPmsg_len = s->_rICPmsg_len; |
| args.peer = p; |
| if (!s->_ICPReqCont->GetActionPtr()->cancelled) { |
| request_start_time = s->_ICPReqCont->GetRequestStartTime(); |
| Debug("icp", "Passing Reply for ICP Id=%d", s->_rICPmsg->h.requestno); |
| s->_ICPReqCont->handleEvent((int) ICP_RESPONSE_MESSAGE, (void *) &args); |
| } else { |
| request_start_time = 0; |
| delete s->_ICPReqCont; |
| Debug("icp", "User cancelled ICP request Id=%d", s->_rICPmsg->h.requestno); |
| } |
| |
| // Note: s->_ICPReqCont is deallocated at this point. |
| s->_ICPReqCont = 0; |
| |
| MUTEX_UNTAKE_LOCK(ICPReqContMutex, ethread); |
| if (request_start_time) { |
| ICP_SUM_DYN_STAT(total_icp_response_time_stat, (ink_get_hrtime() - request_start_time)); |
| } |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_NOT_ACTIVE); |
| s->_next_state = READ_NOT_ACTIVE; |
| break; // move to next_state |
| } |
| #if !defined(__GNUC__) |
| _end_case_get_icp_request_mutex: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case READ_NOT_ACTIVE: |
| case READ_NOT_ACTIVE_EXIT: |
| { |
| ink_release_assert(_recursion_depth == 0); |
| if (!_ICPpr->Lock()) |
| return EVENT_CONT; // unable to get lock, try again later |
| |
| // Note incoming ICP request or response completion |
| _ICPpr->DecPendingQuery(); |
| _ICPpr->Unlock(); |
| |
| s->_buf = 0; |
| if (s->_next_state == READ_NOT_ACTIVE_EXIT) { |
| s->_next_state = READ_PROCESSING_COMPLETE; |
| return EVENT_DONE; |
| } else { |
| // Last read was valid, see if any more read data before exiting |
| s->reset(); |
| s->_start_time = ink_get_hrtime(); |
| s->_next_state = READ_ACTIVE; |
| RECORD_ICP_STATE_CHANGE(s, 0, READ_ACTIVE); |
| break; // restart |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_read_not_active: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // Should never happen |
| |
| case READ_PROCESSING_COMPLETE: |
| default: |
| ink_release_assert(0); // Should never happen |
| |
| } // End of switch |
| |
| } // End of while(1) |
| } |
| |
| //------------------------------------------------------------------------ |
| // Class ICPRequestCont member functions |
| // Implements the state machine which processes locally generated |
| // ICP queries. |
| //------------------------------------------------------------------------ |
| ClassAllocator<ICPRequestCont> ICPRequestCont_allocator("ICPRequestCont_allocator"); |
| |
| ICPRequestCont::ICPRequestCont(ICPProcessor * pr, Continuation * c, URL * u) |
| : Continuation(0), _cont(c), _url(u), _start_time(0), |
| _ICPpr(pr), _timeout(0), |
| npending_actions(0), pendingActions(NULL), |
| _sequence_number(0), _expected_replies(0), |
| _expected_replies_list(MAX_DEFINED_PEERS), _received_replies(0), _next_state(ICP_START) |
| { |
| memset((void *)&_ret_sockaddr, 0, sizeof(_ret_sockaddr)); |
| _ret_status = ICP_LOOKUP_FAILED; |
| _act.cancelled = false; |
| _act = c; |
| memset((void *) &_ICPmsg, 0, sizeof(_ICPmsg)); |
| memset((void *) &_sendMsgHdr, 0, sizeof(_sendMsgHdr)); |
| memset((void *) &_sendMsgIOV, 0, sizeof(_sendMsgIOV[MSG_IOVECS])); |
| |
| if (c) |
| this->mutex = c->mutex; |
| } |
| |
| ICPRequestCont::~ICPRequestCont() |
| { |
| _act = NULL; |
| this->mutex = NULL; |
| |
| if (_timeout) { |
| _timeout->cancel(this); |
| _timeout = 0; |
| } |
| RemoveICPRequest(_sequence_number); |
| |
| if (_ICPmsg.h.opcode == ICP_OP_QUERY) { |
| if (_ICPmsg.un.query.URL) { |
| ats_free(_ICPmsg.un.query.URL); |
| } |
| } |
| if (pendingActions) { |
| delete pendingActions; |
| pendingActions = 0; |
| } |
| } |
| |
| void |
| ICPRequestCont::remove_from_pendingActions(Action * a) |
| { |
| if (!pendingActions) { |
| npending_actions--; |
| return; |
| } |
| for (intptr_t i = 0; i < pendingActions->length(); i++) { |
| if ((*pendingActions)[i] == a) { |
| for (intptr_t j = i; j < pendingActions->length() - 1; j++) |
| (*pendingActions)[j] = (*pendingActions)[j + 1]; |
| pendingActions->set_length(pendingActions->length() - 1); |
| npending_actions--; |
| return; |
| } |
| } |
| npending_actions--; // completed inline |
| } |
| |
| void |
| ICPRequestCont::remove_all_pendingActions() |
| { |
| int active_pendingActions = 0; |
| |
| if (!pendingActions) { |
| return; |
| } |
| for (intptr_t i = 0; i < pendingActions->length(); i++) { |
| if ((*pendingActions)[i] |
| && ((*pendingActions)[i] != ACTION_IO_ERROR)) { |
| ((*pendingActions)[i])->cancel(); |
| (*pendingActions)[i] = 0; |
| npending_actions--; |
| active_pendingActions++; |
| } else { |
| (*pendingActions)[i] = 0; |
| } |
| } |
| pendingActions->set_length(pendingActions->length() - active_pendingActions); |
| } |
| |
| int |
| ICPRequestCont::ICPRequestEvent(int event, Event * e) |
| { |
| // Note: Passed parameter 'e' is not an Event * |
| // if event == ICP_RESPONSE_MESSAGE |
| |
| ink_assert(event == NET_EVENT_DATAGRAM_WRITE_COMPLETE || |
| event == NET_EVENT_DATAGRAM_WRITE_ERROR || |
| event == EVENT_IMMEDIATE || event == EVENT_INTERVAL || event == ICP_RESPONSE_MESSAGE); |
| // handle reentrant callback |
| if ((event == NET_EVENT_DATAGRAM_WRITE_COMPLETE) |
| || (event == NET_EVENT_DATAGRAM_WRITE_ERROR)) { |
| ink_assert(npending_actions > 0); |
| remove_from_pendingActions((Action *) e); |
| return EVENT_DONE; |
| } |
| // Start of user ICP query request processing. We start here after |
| // the reschedule in ICPProcessor::ICPQuery(). |
| switch (_next_state) { |
| case ICP_START: |
| case ICP_OFF_TERMINATE: |
| case ICP_QUEUE_REQUEST: |
| case ICP_AWAITING_RESPONSE: |
| case ICP_DEQUEUE_REQUEST: |
| case ICP_POST_COMPLETION: |
| case ICP_REQUEST_NOT_ACTIVE: |
| { |
| if (ICPStateMachine(event, (void *) e) == EVENT_CONT) { |
| // |
| // Unable to acquire lock, reschedule continuation |
| // |
| eventProcessor.schedule_in(this, HRTIME_MSECONDS(RETRY_INTERVAL), ET_ICP); |
| return EVENT_CONT; |
| |
| } else if (_next_state == ICP_DONE) { |
| // |
| // ICP request processing complete. |
| // |
| delete this; |
| break; |
| } else { |
| break; |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| |
| case ICP_DONE: |
| default: |
| ink_release_assert(0); // should never happen |
| } // End of switch |
| |
| return EVENT_DONE; |
| } |
| |
| int |
| ICPRequestCont::NopICPRequestEvent(int event, Event * e) |
| { |
| NOWARN_UNUSED(event); |
| NOWARN_UNUSED(e); |
| delete this; |
| return EVENT_DONE; |
| } |
| |
| int |
| ICPRequestCont::ICPStateMachine(int event, void *d) |
| { |
| //******************************************* |
| // ICP message processing state machine |
| //******************************************* |
| ICPConfiguration *ICPcf = _ICPpr->GetConfig(); |
| ip_port_text_buffer ipb; |
| |
| while (1) { // loop forever |
| |
| switch (_next_state) { |
| case ICP_START: |
| { |
| // User may have cancelled request, if so abort request. |
| if (_act.cancelled) { |
| _next_state = ICP_DONE; |
| return EVENT_DONE; |
| } |
| |
| if (!_ICPpr->Lock()) |
| return EVENT_CONT; // Unable to get lock, try again later |
| |
| if (_ICPpr->AllowICPQueries() && (ICPcf->globalConfig()->ICPconfigured() == ICP_MODE_SEND_RECEIVE)) { |
| |
| // Reject NULL pointer or "localhost" URLs |
| if (_url->valid()) { |
| int host_len; |
| const char *host = _url->host_get(&host_len); |
| if (ptr_len_casecmp(host, host_len, "127.0.0.1") == 0 || ptr_len_casecmp(host, host_len, "localhost") == 0) { |
| _ICPpr->Unlock(); |
| |
| // NULL pointer or "localhost" URL, terminate request |
| _next_state = ICP_OFF_TERMINATE; |
| Debug("icp", "[ICP_START] NULL/localhost URL ignored Id=%d", _sequence_number); |
| break; // move to next_state |
| } |
| } |
| // Note pending ICP request |
| _ICPpr->IncPendingQuery(); |
| _ICPpr->Unlock(); |
| |
| // Build the ICP query message |
| char *urlstr = _url->string_get(NULL); |
| int urlstr_len = strlen(urlstr) + 1; |
| |
| int status = BuildICPMsg(ICP_OP_QUERY, |
| _sequence_number = ICPReqSeqNumber(), |
| 0 /* optflags */ , 0 /* optdata */ , |
| 0 /* shostid */ , |
| (void *) urlstr, urlstr_len, |
| &_sendMsgHdr, _sendMsgIOV, |
| &_ICPmsg); |
| // urlstr memory freed in destructor |
| ink_assert(status == 0); |
| Debug("icp", "[ICP_START] ICP_OP_QUERY for [%s], Id=%d", urlstr, _sequence_number); |
| |
| _next_state = ICP_QUEUE_REQUEST; |
| break; // move to next_state |
| |
| } else { |
| ICP_INCREMENT_DYN_STAT(icp_start_icpoff_stat); |
| _ICPpr->Unlock(); |
| |
| // ICP NOT enabled, terminate request |
| _next_state = ICP_OFF_TERMINATE; |
| break; // move to next_state |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_icp_start: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| |
| case ICP_OFF_TERMINATE: |
| { |
| if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) { |
| return EVENT_CONT; // unable to get lock, delay and retry |
| } |
| Debug("icp", "[ICP_OFF_TERMINATE] Id=%d", _sequence_number); |
| |
| // ICP NOT enabled, post completion on request |
| if (!_act.cancelled) { |
| _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr); |
| } |
| MUTEX_UNTAKE_LOCK(mutex, this_ethread()); |
| |
| _next_state = ICP_DONE; |
| return EVENT_DONE; |
| } |
| #if !defined(__GNUC__) |
| _end_case_icp_off_terminate: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| |
| case ICP_QUEUE_REQUEST: |
| { |
| // Place ICP request on the pending request queue |
| int ret = AddICPRequest(_sequence_number, this); |
| ink_assert(ret == 0); |
| |
| // Generate ICP requests to peers |
| int bias = _ICPpr->GetStartingSendPeerBias(); |
| int SendPeers = _ICPpr->GetSendPeers(); |
| npending_actions = 0; |
| while (SendPeers > 0) { |
| Peer *P = _ICPpr->GetNthSendPeer(SendPeers, bias); |
| if (!P->IsOnline()) { |
| SendPeers--; |
| continue; |
| } |
| // |
| // Send query request to Peers |
| // |
| |
| // because of reentrancy, we have to do this first, just |
| // in case we get called back immediately. |
| int was_expected = P->ExpectedReplies(&_expected_replies_list); |
| _expected_replies += was_expected; |
| npending_actions++; |
| Action *a = P->SendMsg_re(this, P, &_sendMsgHdr, NULL); |
| if (!a) { |
| a = ACTION_IO_ERROR; |
| } |
| if (a != ACTION_IO_ERROR) { |
| if (a != ACTION_RESULT_DONE) { |
| if (!pendingActions) { |
| pendingActions = NEW(new DynArray<Action *>(&default_action)); |
| } |
| (*pendingActions) (npending_actions) = a; |
| } |
| P->LogSendMsg(&_ICPmsg, NULL); // log as send query |
| Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d send query to [%s]", |
| _sequence_number, ats_ip_nptop(P->GetIP(), ipb, sizeof(ipb))); |
| } else { |
| _expected_replies_list.ClearBit(P->GetPeerID()); |
| _expected_replies -= was_expected; |
| // Partial or failed write. |
| ICP_INCREMENT_DYN_STAT(send_query_partial_write_stat); |
| // coverity[uninit_use_in_call] |
| Debug("icp_warn", |
| "ICP query send, res=%d, ip=%s", ntohs(_ICPmsg.h.msglen), |
| ats_ip_ntop(P->GetIP(), ipb, sizeof(ipb))); |
| } |
| SendPeers--; |
| } |
| |
| Debug("icp", "[ICP_QUEUE_REQUEST] Id=%d expected replies=%d", _sequence_number, _expected_replies); |
| if (!_expected_replies) { |
| // |
| // Nothing to wait for, terminate ICP processing |
| // |
| ICP_INCREMENT_DYN_STAT(icp_queries_no_expected_replies_stat); |
| _next_state = ICP_DEQUEUE_REQUEST; |
| break; // move to next_state |
| } |
| ICP_SUM_DYN_STAT(total_udp_send_queries_stat, _expected_replies); |
| |
| // |
| // Setup ICP request response timeout |
| // |
| int tval = _ICPpr->GetConfig()->globalConfig()->ICPqueryTimeout(); |
| _timeout = eventProcessor.schedule_in(this, HRTIME_SECONDS(tval), ET_ICP); |
| |
| _next_state = ICP_AWAITING_RESPONSE; |
| return EVENT_DONE; |
| } |
| #if !defined(__GNUC__) |
| _end_case_icp_queue_request: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| |
| case ICP_AWAITING_RESPONSE: |
| { |
| Debug("icp", "[ICP_AWAITING_RESPONSE] Id=%d", _sequence_number); |
| ink_assert(d); |
| ICPRequestEventArgs_t dummyArgs; |
| ICPRequestEventArgs_t *args = 0; |
| |
| if (event == ICP_RESPONSE_MESSAGE) { |
| args = (ICPRequestEventArgs_t *) d; |
| } else if (event == EVENT_INTERVAL) { |
| memset((void *) &dummyArgs, 0, sizeof(dummyArgs)); |
| args = &dummyArgs; |
| } else { |
| ink_release_assert(0); // should never happen |
| } |
| |
| // Process ICP response |
| if (ICPResponseMessage(event, args->rICPmsg, args->rICPmsg_len, args->peer) == EVENT_DONE) { |
| // ICP Request processing is complete, do completion actions |
| _next_state = ICP_DEQUEUE_REQUEST; |
| break; // move to next_state |
| |
| } else { |
| // Continue to wait for additional replies |
| return EVENT_DONE; |
| } |
| } |
| #if !defined(__GNUC__) |
| _end_case_icp_awaiting_response: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| |
| case ICP_DEQUEUE_REQUEST: |
| { |
| // Remove ICP request from active queue |
| int ret = RemoveICPRequest(_sequence_number); |
| Debug("icp", "[ICP_DEQUEUE_REQUEST] Id=%d", _sequence_number); |
| ink_assert(ret == 0); |
| //_sequence_number = 0; // moved to REQUEST_NOT_ACTIVE |
| _next_state = ICP_POST_COMPLETION; |
| break; // move to next_state |
| } |
| #if !defined(__GNUC__) |
| _end_case_icp_dequeue_request: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| |
| case ICP_POST_COMPLETION: |
| { |
| if (!MUTEX_TAKE_TRY_LOCK_FOR(mutex, this_ethread(), _cont)) { |
| return EVENT_CONT; // unable to get lock, delay and retry |
| } |
| Debug("icp", "[ICP_POST_COMPLETION] Id=%d", _sequence_number); |
| |
| // Post completion on the ICP request. |
| if (!_act.cancelled) { |
| _cont->handleEvent(_ret_status, (void *) &_ret_sockaddr); |
| } |
| MUTEX_UNTAKE_LOCK(mutex, this_ethread()); |
| ICP_SUM_DYN_STAT(total_icp_request_time_stat, (ink_get_hrtime() - _start_time)); |
| |
| _next_state = ICP_WAIT_SEND_COMPLETE; |
| break; // move to next_state |
| } |
| #if !defined(__GNUC__) |
| _end_case_icp_post_completion: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| case ICP_WAIT_SEND_COMPLETE: |
| { |
| // wait for all the sends to complete. |
| if (npending_actions > 0) { |
| Debug("icp", "[ICP_WAIT_SEND_COMPLETE] Id=%d active=%d", _sequence_number, npending_actions); |
| } else { |
| _next_state = ICP_REQUEST_NOT_ACTIVE; |
| // move to next state |
| break; |
| } |
| } |
| break; |
| #if !defined(__GNUC__) |
| _end_case_icp_wait_send_complete: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| case ICP_REQUEST_NOT_ACTIVE: |
| { |
| Debug("icp", "[ICP_REQUEST_NOT_ACTIVE] Id=%d", _sequence_number); |
| _sequence_number = 0; |
| if (!_ICPpr->Lock()) |
| return EVENT_CONT; // Unable to get lock, try again later |
| |
| // Note pending ICP request completion |
| _ICPpr->DecPendingQuery(); |
| _ICPpr->Unlock(); |
| |
| _next_state = ICP_DONE; |
| return EVENT_DONE; |
| } |
| #if !defined(__GNUC__) |
| _end_case_icp_request_not_active: // fix DEC warnings |
| #endif |
| ink_release_assert(0); // should never happen |
| |
| case ICP_DONE: |
| default: |
| ink_release_assert(0); // should never happen |
| |
| } // End of switch |
| |
| } // End of while(1) |
| } |
| |
| int |
| ICPRequestCont::ICPResponseMessage(int event, ICPMsg_t * m, int ICPMsg_len, Peer * peer) |
| { |
| NOWARN_UNUSED(ICPMsg_len); |
| ip_port_text_buffer ipb, ipb2; |
| if (event == EVENT_INTERVAL) { |
| |
| _timeout = 0; |
| remove_all_pendingActions(); |
| |
| // ICP request response timeout, if we received a response from |
| // any parent, return it to resolve the miss. |
| |
| if (_received_replies) { |
| int NumParentPeers = _ICPpr->GetParentPeers(); |
| if (NumParentPeers > 0) { |
| int n; |
| Peer *pp; |
| for (n = 0; n < NumParentPeers; n++) { |
| pp = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias()); |
| if (pp && !_expected_replies_list.IsBitSet(pp->GetPeerID()) |
| && pp->isUp()) { |
| ats_ip_copy(&_ret_sockaddr.sa, pp->GetIP()); |
| _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(pp)->GetProxyPort()); |
| _ret_status = ICP_LOOKUP_FOUND; |
| |
| Debug("icp", |
| "ICP timeout using parent Id=%d from [%s] return [%s]", |
| _sequence_number, |
| ats_ip_nptop(pp->GetIP(), ipb, sizeof(ipb)), |
| ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2)) |
| ); |
| return EVENT_DONE; |
| } |
| } |
| } |
| } |
| // Timeout received on ICP request, return ICP_LOOKUP_FAILED |
| Debug("icp", "ICP Response timeout for Id=%d", _sequence_number); |
| return EVENT_DONE; |
| |
| } else { |
| // We have received a response to our ICP query request. |
| // See if this response resolves the ICP query. |
| // |
| ink_assert(m->h.requestno == _sequence_number); |
| |
| switch (m->h.opcode) { |
| case ICP_OP_HIT: |
| case ICP_OP_HIT_OBJ: |
| { |
| // Kill timeout event |
| _timeout->cancel(this); |
| _timeout = 0; |
| |
| ICP_INCREMENT_DYN_STAT(icp_query_hits_stat); |
| ++_received_replies; |
| ats_ip_copy(&_ret_sockaddr, peer->GetIP()); |
| _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(peer)->GetProxyPort()); |
| _ret_status = ICP_LOOKUP_FOUND; |
| |
| Debug("icp", |
| "ICP Response HIT for Id=%d from [%s] return [%s]", |
| _sequence_number, |
| ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb)), |
| ats_ip_nptop(&_ret_sockaddr, ipb2, sizeof(ipb2)) |
| ); |
| return EVENT_DONE; |
| } |
| case ICP_OP_MISS: |
| case ICP_OP_ERR: |
| case ICP_OP_MISS_NOFETCH: |
| case ICP_OP_DENIED: |
| { |
| Debug("icp", "ICP MISS response for Id=%d from [%s]", |
| _sequence_number, ats_ip_nptop(peer->GetIP(), ipb, sizeof(ipb))); |
| // "received_replies" is only for Peers who we expect a reply |
| // from (Peers which are in the expected_replies_list). |
| int Id = peer->GetPeerID(); |
| if (_expected_replies_list.IsBitSet(Id)) { |
| // Clear bit to note receipt of reply |
| _expected_replies_list.ClearBit(Id); |
| ++_received_replies; |
| } |
| |
| if (_received_replies < _expected_replies) |
| return EVENT_CONT; // wait for more responses |
| |
| // Kill timeout event |
| _timeout->cancel(this); |
| _timeout = 0; |
| |
| ICP_INCREMENT_DYN_STAT(icp_query_misses_stat); |
| // |
| // All responders have returned ICP_OP_MISS. |
| // If parents exists, select one to resolve the request. |
| // |
| if (_ICPpr->GetParentPeers() > 0) { |
| // In cases where multiple parents exist, we use |
| // a round robin scheme. |
| Peer *p = NULL; |
| // try to find an UP parent, if none, return ICP_LOOKUP_FAILED |
| { |
| int i; |
| for (i = 0; i < _ICPpr->GetParentPeers(); i++) { |
| p = _ICPpr->GetNthParentPeer(0, _ICPpr->GetStartingParentPeerBias()); |
| // find an UP parent |
| if (p->isUp()) |
| break; |
| } |
| // if no parent is selected, then return ICP_LOOKUP_FAILED |
| if (i >= _ICPpr->GetParentPeers()) { |
| Debug("icp", "None of the %d ICP parent(s) is up", _ICPpr->GetParentPeers()); |
| p = NULL; |
| } |
| } |
| if (p) { |
| ats_ip_copy(&_ret_sockaddr, p->GetIP()); |
| _ret_sockaddr.port() = htons(static_cast<ParentSiblingPeer*>(p)->GetProxyPort()); |
| _ret_status = ICP_LOOKUP_FOUND; |
| |
| Debug("icp", "ICP ALL MISS(1) for Id=%d return [%s]", |
| _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb))); |
| return EVENT_DONE; |
| } |
| } |
| Debug("icp", "ICP ALL MISS(2) for Id=%d return [%s]", |
| _sequence_number, ats_ip_nptop(&_ret_sockaddr, ipb, sizeof(ipb))); |
| return EVENT_DONE; |
| } |
| default: |
| { |
| ICP_INCREMENT_DYN_STAT(invalid_icp_query_response_stat); |
| // coverity[uninit_use_in_call] |
| Warning("Invalid ICP response, op=%d reqno=%d ip=%s", |
| m->h.opcode, m->h.requestno, ats_ip_ntop(peer->GetIP(), ipb, sizeof(ipb))); |
| return EVENT_CONT; // wait for more responses |
| } |
| |
| } // End of switch |
| } |
| } |
| |
| //------------------------------------------------ |
| // Class ICPRequestCont static member functions |
| //------------------------------------------------ |
| |
| // Static member function |
| void |
| ICPRequestCont::NetToHostICPMsg(ICPMsg_t * in, ICPMsg_t * out) |
| { |
| out->h.opcode = in->h.opcode; |
| out->h.version = in->h.version; |
| out->h.msglen = ntohs(in->h.msglen); |
| out->h.requestno = ntohl(in->h.requestno); |
| out->h.optionflags = ntohl(in->h.optionflags); |
| out->h.optiondata = ntohl(in->h.optiondata); |
| out->h.shostid = ntohl(in->h.shostid); |
| |
| switch (in->h.opcode) { |
| case ICP_OP_QUERY: |
| { |
| memcpy((char *) &out->un.query.rhostid, |
| (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid)), sizeof(out->un.query.rhostid)); |
| out->un.query.rhostid = ntohl(out->un.query.rhostid); |
| out->un.query.URL = (char *) ((char *) (&in->h.shostid) + sizeof(in->h.shostid) + sizeof(out->un.query.rhostid)); |
| break; |
| } |
| case ICP_OP_HIT: |
| { |
| out->un.hit.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid)); |
| break; |
| } |
| case ICP_OP_MISS: |
| { |
| out->un.miss.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid)); |
| break; |
| } |
| case ICP_OP_HIT_OBJ: |
| { |
| out->un.hitobj.URL = (char *)((char *) (&in->h.shostid) + sizeof(in->h.shostid)); |
| |
| // strlen() is bounded since buffer in null terminated. |
| out->un.hitobj.p_objsize = (char *) (out->un.hitobj.URL + strlen(out->un.hitobj.URL)); |
| memcpy((char *) &out->un.hitobj.objsize, out->un.hitobj.p_objsize, sizeof(out->un.hitobj.objsize)); |
| out->un.hitobj.objsize = ntohs(out->un.hitobj.objsize); |
| out->un.hitobj.data = (char *) (out->un.hitobj.p_objsize + sizeof(out->un.hitobj.objsize)); |
| break; |
| } |
| default: |
| break; |
| } |
| } |
| |
| int |
| ICPRequestCont::BuildICPMsg(ICPopcode_t op, unsigned int seqno, |
| int optflags, int optdata, int shostid, |
| void *data, int datalen, struct msghdr *mhdr, struct iovec *iov, ICPMsg_t * icpmsg) |
| { |
| // Build ICP message for transmission in network byte order. |
| if (op == ICP_OP_QUERY) { |
| icpmsg->un.query.rhostid = htonl(0); |
| icpmsg->un.query.URL = (char *) data; |
| |
| mhdr->msg_iov = iov; |
| mhdr->msg_iovlen = 3; |
| |
| iov[0].iov_base = (caddr_t) icpmsg; |
| iov[0].iov_len = sizeof(ICPMsgHdr_t); |
| |
| iov[1].iov_base = (caddr_t) & icpmsg->un.query.rhostid; |
| iov[1].iov_len = sizeof(icpmsg->un.query.rhostid); |
| |
| iov[2].iov_base = (caddr_t) data; |
| iov[2].iov_len = datalen; |
| icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len + iov[2].iov_len); |
| |
| } else if (op == ICP_OP_HIT) { |
| icpmsg->un.hit.URL = (char *) data; |
| |
| mhdr->msg_iov = iov; |
| mhdr->msg_iovlen = 2; |
| |
| iov[0].iov_base = (caddr_t) icpmsg; |
| iov[0].iov_len = sizeof(ICPMsgHdr_t); |
| |
| iov[1].iov_base = (caddr_t) data; |
| iov[1].iov_len = datalen; |
| icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len); |
| |
| } else if (op == ICP_OP_MISS) { |
| icpmsg->un.miss.URL = (char *) data; |
| |
| mhdr->msg_iov = iov; |
| mhdr->msg_iovlen = 2; |
| |
| iov[0].iov_base = (caddr_t) icpmsg; |
| iov[0].iov_len = sizeof(ICPMsgHdr_t); |
| |
| iov[1].iov_base = (caddr_t) data; |
| iov[1].iov_len = datalen; |
| icpmsg->h.msglen = htons(iov[0].iov_len + iov[1].iov_len); |
| |
| } else { |
| ink_release_assert(0); |
| return 1; // failed |
| } |
| |
| mhdr->msg_name = (caddr_t) 0; |
| mhdr->msg_namelen = 0; |
| // TODO: The following is just awkward |
| #if !defined(linux) && !defined(freebsd) && !defined(darwin) && !defined(solaris) \ |
| && !defined(openbsd) |
| mhdr->msg_accrights = (caddr_t) 0; |
| mhdr->msg_accrightslen = 0; |
| #elif !defined(solaris) |
| mhdr->msg_control = 0; |
| mhdr->msg_controllen = 0; |
| mhdr->msg_flags = 0; |
| #endif |
| |
| icpmsg->h.opcode = op; |
| icpmsg->h.version = ICP_VERSION_2; |
| icpmsg->h.requestno = htonl(seqno); |
| icpmsg->h.optionflags = htonl(optflags); |
| icpmsg->h.optiondata = htonl(optdata); |
| icpmsg->h.shostid = htonl(shostid); |
| |
| return 0; // Success |
| } |
| |
| // Static ICPRequestCont data declarations |
| unsigned int |
| ICPRequestCont::ICPRequestSeqno = 1; |
| Queue<ICPRequestCont> ICPRequestQueue[ICPRequestCont::ICP_REQUEST_HASH_SIZE]; |
| |
| // Static member function |
| unsigned int |
| ICPRequestCont::ICPReqSeqNumber() |
| { |
| // Generate ICP request sequence numbers. This must be unique. |
| unsigned int res = 0; |
| do { |
| res = (unsigned int) ink_atomic_increment((int *) &ICPRequestSeqno, 1); |
| } while (!res); |
| |
| return res; |
| } |
| |
| // Static member function |
| inline int |
| ICPRequestCont::ICPRequestHash(unsigned int seqno) |
| { |
| // ICPRequestQueue hash |
| return seqno % ICP_REQUEST_HASH_SIZE; |
| } |
| |
| // Static member function |
| int |
| ICPRequestCont::AddICPRequest(unsigned int seqno, ICPRequestCont * r) |
| { |
| // Add ICP request to ICP outstanding queue (ICPRequestQueue). |
| // return: 0 - success |
| |
| ICPRequestQueue[ICPRequestHash(seqno)].enqueue(r); |
| return 0; // Success |
| } |
| |
| // Static member function |
| ICPRequestCont * |
| ICPRequestCont::FindICPRequest(unsigned int seqno) |
| { |
| // Find ICP request on outstanding queue with the given sequence number |
| int hash = ICPRequestHash(seqno); |
| ICPRequestCont *r; |
| |
| for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) { |
| if (r->_sequence_number == seqno) |
| return r; |
| } |
| return (ICPRequestCont *) 0; // Not found |
| } |
| |
| // Static member function |
| int |
| ICPRequestCont::RemoveICPRequest(unsigned int seqno) |
| { |
| // Remove ICP request from outstanding queue with the given |
| // sequence number |
| // Return: 0 - success; 1 - not found |
| |
| if (!seqno) { |
| return 1; // Not found |
| } |
| int hash = ICPRequestHash(seqno); |
| ICPRequestCont *r; |
| |
| for (r = (ICPRequestCont *) ICPRequestQueue[hash].head; r; r = (ICPRequestCont *) r->link.next) { |
| if (r->_sequence_number == seqno) { |
| ICPRequestQueue[hash].remove(r); |
| return 0; |
| } |
| } |
| return 1; // Not found |
| } |
| |
| //------------------------------------------------------------------------ |
| // Class ICPProcessor member functions |
| // Central class which initializes the ICP world. |
| // Delegates incoming message processing to ICPHandlerCont |
| // and outgoing message processing to ICPRequestCont. |
| // Manages the ICP configuration database derived from TS |
| // configuration info. |
| //------------------------------------------------------------------------ |
| |
| // Static data declarations for ICPProcessor |
| void |
| initialize_thread_for_icp(EThread * e) |
| { |
| (void) e; |
| } |
| |
| ICPProcessor icpProcessorInternal; |
| ICPProcessorExt icpProcessor(&icpProcessorInternal); |
| |
| ICPProcessor::ICPProcessor() |
| : _l(0), _Initialized(0), _AllowIcpQueries(0), |
| _PendingIcpQueries(0), _ICPConfig(0), _ICPPeriodic(0), _ICPHandler(0), |
| _mcastCB_handler(NULL), _PeriodicEvent(0), _ICPHandlerEvent(0), |
| _nPeerList(-1), _LocalPeer(0), |
| _curSendPeer(0), _nSendPeerList(-1), |
| _curRecvPeer(0), _nRecvPeerList(-1), _curParentPeer(0), _nParentPeerList(-1), _ValidPollData(0), _last_recv_peer_bias(0) |
| { |
| memset((void *)_PeerList, 0, sizeof(_PeerList[PEER_LIST_SIZE])); |
| memset((void *)_SendPeerList, 0, sizeof(_SendPeerList[SEND_PEER_LIST_SIZE])); |
| memset((void *)_RecvPeerList, 0, sizeof(_RecvPeerList[RECV_PEER_LIST_SIZE])); |
| memset((void *)_ParentPeerList, 0, sizeof(_ParentPeerList[PARENT_PEER_LIST_SIZE])); |
| memset((void *)_PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE])); |
| } |
| |
| ICPProcessor::~ICPProcessor() |
| { |
| if (_ICPPeriodic) { |
| MUTEX_TAKE_LOCK(_ICPPeriodic->mutex, this_ethread()); |
| _PeriodicEvent->cancel(); |
| Mutex_unlock(_ICPPeriodic->mutex, this_ethread()); |
| } |
| |
| if (_ICPHandler) { |
| MUTEX_TAKE_LOCK(_ICPHandler->mutex, this_ethread()); |
| _ICPHandlerEvent->cancel(); |
| Mutex_unlock(_ICPHandler->mutex, this_ethread()); |
| } |
| } |
| |
| void |
| ICPProcessor::start() |
| { |
| //***************************************************** |
| // Perform initialization actions for ICPProcessor |
| // (called at system startup) |
| //***************************************************** |
| if (_Initialized) // Do only once |
| return; |
| |
| // |
| // Setup ICPProcessor lock, required since ICPProcessor is instantiated |
| // as static object. |
| // |
| _l = NEW(new AtomicLock()); |
| |
| // |
| // Setup custom allocators |
| // |
| // replaced with generic IOBufferBlock allocator |
| ICPHandlerCont::ICPDataBuf_IOBuffer_sizeindex = iobuffer_size_to_index(MAX_ICP_MSGSIZE, MAX_BUFFER_SIZE_INDEX); |
| |
| // |
| // Setup ICP stats callbacks |
| // |
| InitICPStatCallbacks(); |
| |
| // |
| // Create ICP configuration objects |
| // |
| _ICPConfig = NEW(new ICPConfiguration()); |
| |
| _mcastCB_handler = NEW(new ICPHandlerCont(this)); |
| SET_CONTINUATION_HANDLER(_mcastCB_handler, (ICPHandlerContHandler) & ICPHandlerCont::TossEvent); |
| |
| |
| // |
| // Build ICP peer list and setup listen sockets |
| // |
| if (_ICPConfig->globalConfig()->ICPconfigured()) { |
| if (BuildPeerList() == 0) { |
| if (SetupListenSockets() == 0) { |
| _AllowIcpQueries = 1; // allow receipt of queries |
| } |
| } |
| } |
| DumpICPConfig(); |
| |
| // |
| // Start ICP configuration monitor (periodic continuation) |
| // |
| _ICPPeriodic = NEW(new ICPPeriodicCont(this)); |
| SET_CONTINUATION_HANDLER(_ICPPeriodic, (ICPPeriodicContHandler) & ICPPeriodicCont::PeriodicEvent); |
| _PeriodicEvent = eventProcessor.schedule_every(_ICPPeriodic, HRTIME_MSECONDS(ICPPeriodicCont::PERIODIC_INTERVAL), ET_ICP); |
| |
| // |
| // Start ICP receive handler continuation |
| // |
| _ICPHandler = NEW(new ICPHandlerCont(this)); |
| SET_CONTINUATION_HANDLER(_ICPHandler, (ICPHandlerContHandler) & ICPHandlerCont::PeriodicEvent); |
| _ICPHandlerEvent = eventProcessor.schedule_every(_ICPHandler, |
| HRTIME_MSECONDS(ICPHandlerCont::ICP_HANDLER_INTERVAL), ET_ICP); |
| // |
| // Stale lookup data initializations |
| // |
| if (!gclient_request.valid()) { |
| gclient_request.create(HTTP_TYPE_REQUEST); |
| } |
| _Initialized = 1; |
| } |
| |
| Action * |
| ICPProcessor::ICPQuery(Continuation * c, URL * url) |
| { |
| //************************************** |
| // HTTP state machine interface to ICP |
| //************************************** |
| |
| // Build continuation to process ICP request |
| EThread *thread = this_ethread(); |
| ProxyMutex *mutex = thread->mutex; |
| ICPRequestCont *rc = new(ICPRequestCont_allocator.alloc()) ICPRequestCont(this, c, url); |
| |
| ICP_INCREMENT_DYN_STAT(icp_query_requests_stat); |
| |
| rc->SetRequestStartTime(); |
| SET_CONTINUATION_HANDLER(rc, (ICPRequestContHandler) & ICPRequestCont::ICPRequestEvent); |
| eventProcessor.schedule_imm(rc, ET_ICP); |
| |
| return rc->GetActionPtr(); |
| } |
| |
| int |
| ICPProcessor::BuildPeerList() |
| { |
| // Returns 0 on Success |
| |
| // |
| //--------------------------------------------------------------------- |
| // We always place all allocated Peer elements onto PeerList[], |
| // which is used to track allocated elements and validate (ip, port) |
| // uniqueness in the ICP configuration. |
| // |
| // All MultiCastPeer(s) link the underlying ParentSiblingPeer structures |
| // using a singly linked list off the MultiCastPeer. |
| // |
| // Peer elements placed onto SendPeerList[] are elements which are |
| // the target of ICP queries. |
| // In the case where MultiCasting is used, a pseudo peer element |
| // (MultiCastPeer) is placed onto the SendPeerList[] to act as a place |
| // holder for the underlying Peers. |
| // |
| // RecvPeerList[] is the list of Peer(s) we perform reads on for |
| // ICP messages. In the case of MultiCast, the pseudo MultiCast peer |
| // element (MultiCastPeer) is placed on this list. Since we currently |
| // funnel all unicast receives through the local peer UDP socket, |
| // only the local peer and any pseudo MultiCastPeer structures reside |
| // on this list. |
| // |
| // Parent (PEER_PARENT) Peer elements are also added to ParentPeerList |
| // which is used to select a parent in the case where all ICP queries |
| // have returned ICP_MISS. |
| //--------------------------------------------------------------------- |
| // |
| PeerConfigData *Pcfg; |
| Peer *P; |
| Peer *mcP; |
| int index; |
| int status; |
| PeerType_t type; |
| |
| // |
| // From the working copy of the ICP configuration data, build the |
| // internal Peer data structures for ICP processing. |
| // First, establish the Local Peer descriptor before processing |
| // parents and siblings. |
| // |
| Pcfg = _ICPConfig->indexToPeerConfigData(0); |
| ink_strlcpy(Pcfg->_hostname, "localhost", sizeof(Pcfg->_hostname)); |
| Pcfg->_ctype = PeerConfigData::CTYPE_LOCAL; |
| |
| // Get IP address for given interface |
| IpEndpoint tmp_ip; |
| if (!mgmt_getAddrForIntr(GetConfig()->globalConfig()->ICPinterface(), &tmp_ip.sa)) { |
| Pcfg->_ip_addr._family = AF_UNSPEC; |
| // No IP address for given interface |
| Warning("ICP interface [%s] has no IP address", GetConfig()->globalConfig()->ICPinterface()); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP interface has no IP address"); |
| } else { |
| Pcfg->_my_ip_addr = Pcfg->_ip_addr = tmp_ip; |
| } |
| Pcfg->_proxy_port = 0; |
| Pcfg->_icp_port = GetConfig()->globalConfig()->ICPport(); |
| Pcfg->_mc_member = 0; |
| Pcfg->_mc_ip_addr._family = AF_UNSPEC; |
| Pcfg->_mc_ttl = 0; |
| |
| //*************************************************** |
| // Descriptor for local host, add to PeerList and |
| // RecvPeerList |
| //*************************************************** |
| P = NEW(new ParentSiblingPeer(PEER_LOCAL, Pcfg, this)); |
| status = AddPeer(P); |
| ink_release_assert(status); |
| status = AddPeerToRecvList(P); |
| ink_release_assert(status); |
| _LocalPeer = P; |
| |
| for (index = 1; index < MAX_DEFINED_PEERS; ++index) { |
| Pcfg = _ICPConfig->indexToPeerConfigData(index); |
| type = PeerConfigData::CTypeToPeerType_t(Pcfg->GetCType()); |
| // |
| // Ignore parent and sibling entries corresponding to "localhost". |
| // This is possible in a cluster configuration where parents and |
| // siblings are cluster members. Note that in a cluster |
| // configuration, "icp.config" is shared by all nodes. |
| // |
| if (Pcfg->GetIPAddr() == _LocalPeer->GetIP()) |
| continue; // ignore |
| |
| if ((type == PEER_PARENT) || (type == PEER_SIBLING)) { |
| |
| if (Pcfg->MultiCastMember()) { |
| mcP = FindPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort()); |
| if (!mcP) { |
| //********************************* |
| // Create multicast peer structure |
| //********************************* |
| mcP = NEW(new MultiCastPeer(Pcfg->GetMultiCastIPAddr(), Pcfg->GetICPPort(), Pcfg->GetMultiCastTTL(), this)); |
| status = AddPeer(mcP); |
| ink_assert(status); |
| status = AddPeerToSendList(mcP); |
| ink_assert(status); |
| status = AddPeerToRecvList(mcP); |
| ink_assert(status); |
| } |
| //***************************** |
| // Add child to MultiCast peer |
| //***************************** |
| P = NEW(new ParentSiblingPeer(type, Pcfg, this)); |
| status = AddPeer(P); |
| ink_assert(status); |
| status = ((MultiCastPeer *) mcP)->AddMultiCastChild(P); |
| ink_assert(status); |
| |
| } else { |
| //***************************** |
| // Add parent/sibling peer |
| //***************************** |
| P = NEW(new ParentSiblingPeer(type, Pcfg, this)); |
| status = AddPeer(P); |
| ink_assert(status); |
| status = AddPeerToSendList(P); |
| ink_assert(status); |
| } |
| //**************************************** |
| // Also, add parent peers to parent list. |
| //**************************************** |
| if (type == PEER_PARENT) { |
| status = AddPeerToParentList(P); |
| ink_assert(status); |
| } |
| } |
| } |
| return 0; // Success |
| } |
| |
| void |
| ICPProcessor::FreePeerList() |
| { |
| // Deallocate all Peer structures |
| int index; |
| for (index = 0; index < (_nPeerList + 1); ++index) { |
| if (_PeerList[index]) { |
| _PeerList[index] = 0; |
| } |
| } |
| // Reset all control data |
| _nPeerList = -1; |
| _LocalPeer = (Peer *) 0; |
| _curSendPeer = 0; |
| _nSendPeerList = -1; |
| _curRecvPeer = 0; |
| _nRecvPeerList = -1; |
| _curParentPeer = 0; |
| _nParentPeerList = -1; |
| _ValidPollData = 0; |
| _last_recv_peer_bias = 0; |
| |
| for (index = 0; index < PEER_LIST_SIZE; index++) { |
| _PeerList[index] = 0; |
| } |
| for (index = 0; index < SEND_PEER_LIST_SIZE; index++) { |
| _SendPeerList[index] = 0; |
| } |
| for (index = 0; index < RECV_PEER_LIST_SIZE; index++) { |
| _RecvPeerList[index] = 0; |
| } |
| for (index = 0; index < PARENT_PEER_LIST_SIZE; index++) { |
| _ParentPeerList[index] = 0; |
| } |
| memset((void *) _PeerIDtoPollIndex, 0, sizeof(_PeerIDtoPollIndex[PEER_ID_POLL_INDEX_SIZE])); |
| } |
| |
| int |
| ICPProcessor::SetupListenSockets() |
| { |
| int allow_null_configuration; |
| |
| if ((_ICPConfig->globalConfig()->ICPconfigured() == ICP_MODE_RECEIVE_ONLY) |
| && _ICPConfig->globalConfig()->ICPReplyToUnknownPeer()) { |
| allow_null_configuration = 1; |
| } else { |
| allow_null_configuration = 0; |
| } |
| |
| // Returns 0 on Success. |
| |
| // |
| // Perform some basic sanity checks on the ICP configuration. |
| // |
| if (!_LocalPeer) { |
| Warning("ICP setup, no defined local Peer"); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined local Peer"); |
| return 1; // Failed |
| } |
| |
| if (GetSendPeers() == 0) { |
| if (!allow_null_configuration) { |
| Warning("ICP setup, no defined send Peer(s)"); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined send Peer(s)"); |
| return 1; // Failed |
| } |
| } |
| if (GetRecvPeers() == 0) { |
| if (!allow_null_configuration) { |
| Warning("ICP setup, no defined receive Peer(s)"); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP setup, no defined receive Peer(s)"); |
| return 1; // Failed |
| } |
| } |
| // |
| // Establish the required sockets for elements on the PeerList[]. |
| // |
| Peer *P; |
| int status; |
| int index; |
| for (index = 0; index < (_nPeerList + 1); ++index) { |
| ip_port_text_buffer ipb, ipb2; |
| |
| if ((P = _PeerList[index])) { |
| |
| if ((P->GetType() == PEER_PARENT) |
| || (P->GetType() == PEER_SIBLING)) { |
| ParentSiblingPeer *pPS = (ParentSiblingPeer *) P; |
| |
| pPS->GetChan()->setRemote(pPS->GetIP()); |
| |
| } else if (P->GetType() == PEER_MULTICAST) { |
| MultiCastPeer *pMC = (MultiCastPeer *) P; |
| ink_assert(_mcastCB_handler != NULL); |
| status = pMC->GetSendChan()->setup_mc_send(pMC->GetIP(), _LocalPeer->GetIP(), NON_BLOCKING, pMC->GetTTL(), DISABLE_MC_LOOPBACK, _mcastCB_handler); |
| if (status) { |
| // coverity[uninit_use_in_call] |
| Warning("ICP MC send setup failed, res=%d, ip=%s bind_ip=%s", |
| status, |
| ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb)), |
| ats_ip_nptop(_LocalPeer->GetIP(), ipb2, sizeof(ipb2)) |
| ); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC send setup failed"); |
| return 1; // Failed |
| } |
| |
| status = pMC->GetRecvChan()->setup_mc_receive(pMC->GetIP(), |
| NON_BLOCKING, pMC->GetSendChan(), _mcastCB_handler); |
| if (status) { |
| // coverity[uninit_use_in_call] |
| Warning("ICP MC recv setup failed, res=%d, ip=%s", |
| status, ats_ip_nptop(pMC->GetIP(), ipb, sizeof(ipb))); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "ICP MC recv setup failed"); |
| return 1; // Failed |
| } |
| } |
| } |
| } |
| // |
| // Setup the socket for the local host. |
| // We funnel all unicast sends and receives through |
| // the local peer UDP socket. |
| // |
| ParentSiblingPeer *pPS = (ParentSiblingPeer *) ((Peer *) _LocalPeer); |
| |
| pPS->GetChan()->setRemote(pPS->GetIP()); |
| return 0; // Success |
| } |
| |
| void |
| ICPProcessor::ShutdownListenSockets() |
| { |
| // |
| // Close all open sockets for elements on the PeerList[] |
| // |
| ink_assert(!PendingQuery()); |
| Peer *P; |
| |
| int index; |
| for (index = 0; index < (_nPeerList + 1); ++index) { |
| if ((P = _PeerList[index])) { |
| if (P->GetType() == PEER_LOCAL) { |
| ParentSiblingPeer *pPS = (ParentSiblingPeer *) P; |
| (void) pPS->GetChan()->close(); |
| |
| } else if (P->GetType() == PEER_MULTICAST) { |
| MultiCastPeer *pMC = (MultiCastPeer *) P; |
| (void) pMC->GetSendChan()->close(); |
| (void) pMC->GetRecvChan()->close(); |
| } |
| } |
| } |
| } |
| |
| int |
| ICPProcessor::Reconfigure(int global_config_changed, int peer_config_changed) |
| { |
| // Returns 0 on Success |
| |
| NOWARN_UNUSED(global_config_changed); |
| NOWARN_UNUSED(peer_config_changed); |
| // |
| // At this point, ICP requests processing is disabled and |
| // no pending ICP requests exist. |
| // |
| ink_assert(_ICPConfig->HaveLock()); |
| ink_assert(!AllowICPQueries()); |
| ink_assert(!PendingQuery()); |
| // |
| // Shutdown and deallocate all structures associated with the |
| // current configuration. |
| // |
| ShutdownListenSockets(); |
| FreePeerList(); |
| // |
| // Copy the new configuration into the working copy and |
| // rebuild all associated structures. |
| // |
| _ICPConfig->UpdateGlobalConfig(); |
| _ICPConfig->UpdatePeerConfig(); |
| |
| int status = -1; |
| if (_ICPConfig->globalConfig()->ICPconfigured()) { |
| if ((status = BuildPeerList()) == 0) { |
| status = SetupListenSockets(); |
| } |
| DumpICPConfig(); |
| } |
| return status; |
| } |
| |
| ICPProcessor::ReconfigState_t |
| ICPProcessor::ReconfigureStateMachine(ReconfigState_t s, int gconfig_changed, int pconfig_changed) |
| { |
| //***************************************************************** |
| // State machine which performs the ICP reconfiguration actions. |
| // Defined states are as follows: |
| // 1) (RC_RECONFIG) disable ICP, reconfigure if no request pending, |
| // else delay and retry. Reconfigure and if success move to |
| // RC_ENABLE_ICP else RC_DONE. |
| // 2) (RC_ENABLE_ICP) enable ICP, free ICP configuration lock. |
| // 3) (RC_DONE) free ICP configuration lock. |
| //***************************************************************** |
| ink_assert(_ICPConfig->HaveLock()); |
| int reconfig_status; |
| |
| while (1) { |
| |
| switch (s) { |
| case RC_RECONFIG: |
| { |
| if (!Lock()) |
| return RC_RECONFIG; // Unable to get lock, try again |
| |
| if (PendingQuery()) { |
| DisableICPQueries(); // disable ICP processing |
| Unlock(); |
| CancelPendingReads(); |
| return RC_RECONFIG; // Pending requests, delay and retry |
| |
| } else { |
| DisableICPQueries(); // disable ICP processing |
| Unlock(); |
| // No pending ICP queries, perform reconfiguration |
| reconfig_status = Reconfigure(gconfig_changed, pconfig_changed); |
| |
| if (reconfig_status == 0) { |
| s = RC_ENABLE_ICP; // reconfig OK, enable ICP |
| } else { |
| s = RC_DONE; // reconfig failed, do not enable ICP |
| } |
| break; // move to next state |
| } |
| } |
| |
| case RC_ENABLE_ICP: |
| { |
| if (!Lock()) |
| return RC_ENABLE_ICP; // Unable to get lock, try again |
| |
| EnableICPQueries(); // Enable ICP processing |
| Unlock(); |
| |
| s = RC_DONE; |
| break; // move to next state |
| } |
| |
| case RC_DONE: |
| { |
| // Release configuration lock |
| _ICPConfig->Unlock(); |
| return RC_DONE; // Reconfiguration complete |
| } |
| default: |
| { |
| ink_release_assert(0); // Should never happen |
| } |
| |
| } // End of switch |
| |
| } // End of while |
| #if !defined(__GNUC__) |
| _exit_while: // fix DEC warnings |
| #endif |
| return RC_DONE; |
| } |
| |
| void |
| ICPProcessor::CancelPendingReads() |
| { |
| // Cancel pending ICP read by sending a bogus message to |
| // the local ICP port. |
| |
| ICPRequestCont *r = new(ICPRequestCont_allocator.alloc()) |
| ICPRequestCont(this, NULL, NULL); |
| SET_CONTINUATION_HANDLER(r, (ICPRequestContHandler) & ICPRequestCont::NopICPRequestEvent); |
| r->mutex = new_ProxyMutex(); |
| |
| // TODO: Check return value? |
| ICPRequestCont::BuildICPMsg(ICP_OP_HIT, 0, 0 /* optflags */ , 0 /* optdata */ , 0 /* shostid */ , |
| (void *) 0, 0, &r->_sendMsgHdr, r->_sendMsgIOV, &r->_ICPmsg); |
| r->_sendMsgHdr.msg_iovlen = 1; |
| r->_ICPmsg.h.version = ~r->_ICPmsg.h.version; // bogus message |
| |
| Peer *lp = GetLocalPeer(); |
| r->_sendMsgHdr.msg_name = (caddr_t) & (lp->GetSendChan())->addr; |
| r->_sendMsgHdr.msg_namelen = sizeof((lp->GetSendChan())->addr); |
| udpNet.sendmsg_re(r, r, lp->GetSendFD(), &r->_sendMsgHdr); |
| } |
| |
| Peer * |
| ICPProcessor::GenericFindListPeer(IpAddr const& ip, uint16_t port, int validListItems, Ptr<Peer> *List) |
| { |
| Peer *P; |
| port = htons(port); |
| for (int n = 0; n < validListItems; ++n) { |
| if ((P = List[n])) { |
| if ((P->GetIP() == ip) |
| && ((port == 0) || (ats_ip_port_cast(P->GetIP()) == port))) |
| return P; |
| } |
| } |
| return NULL; |
| } |
| |
| Peer * |
| ICPProcessor::FindPeer(IpAddr const& ip, uint16_t port) |
| { |
| // Find (Peer *) with the given (ip,port) on the global list (PeerList) |
| return GenericFindListPeer(ip, port, (_nPeerList + 1), _PeerList); |
| } |
| |
| Peer * |
| ICPProcessor::FindSendListPeer(IpAddr const& ip, uint16_t port) |
| { |
| // Find (Peer *) with the given (ip,port) on the |
| // scheduler list (SendPeerList) |
| return GenericFindListPeer(ip, port, (_nSendPeerList + 1), _SendPeerList); |
| } |
| |
| Peer * |
| ICPProcessor::FindRecvListPeer(IpAddr const& ip, uint16_t port) |
| { |
| // Find (Peer *) with the given (ip,port) on the |
| // receive list (RecvPeerList) |
| return GenericFindListPeer(ip, port, (_nRecvPeerList + 1), _RecvPeerList); |
| } |
| |
| int |
| ICPProcessor::AddPeer(Peer * P) |
| { |
| // Add (Peer *) to the global list (PeerList). Make sure (ip,port) is |
| // unique. |
| // Returns 1 - added; 0 - Not added |
| |
| // |
| // Make sure no duplicate exists |
| // |
| if (FindPeer(P->GetIP())) { |
| ip_port_text_buffer x; |
| // coverity[uninit_use_in_call] |
| Warning("bad icp.config, multiple peer definitions for ip=%s", ats_ip_nptop(P->GetIP(), x, sizeof(x))); |
| REC_SignalWarning(REC_SIGNAL_CONFIG_ERROR, "bad icp.config, multiple peer definitions"); |
| |
| return 0; // Not added |
| } else { |
| // Valid entry |
| if (_nPeerList + 1 < PEER_LIST_SIZE) { |
| _nPeerList++; |
| _PeerList[_nPeerList] = P; |
| P->SetPeerID(_nPeerList); |
| return 1; // Added |
| } else { |
| return 0; // Not added |
| } |
| } |
| } |
| |
| int |
| ICPProcessor::AddPeerToRecvList(Peer * P) |
| { |
| // Add (Peer *) to the listen list (RecvPeerList). |
| // Make sure (ip,port) is unique. |
| // Returns 1 - added; 0 - Not added |
| |
| // Assert that no duplicate exists |
| ink_assert(FindRecvListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0); |
| |
| if (_nRecvPeerList + 1 < RECV_PEER_LIST_SIZE) { |
| _nRecvPeerList++; |
| _RecvPeerList[_nRecvPeerList] = P; |
| return 1; // Added |
| } else { |
| return 0; // Not added |
| } |
| } |
| |
| int |
| ICPProcessor::AddPeerToSendList(Peer * P) |
| { |
| // Add (Peer *) to the scheduler list (SendPeerList). |
| // Make sure (ip,port) is unique. |
| // Returns 1 - added; 0 - Not added |
| |
| // Assert that no duplicate exists |
| ink_assert(FindSendListPeer(IpAddr(P->GetIP()), ats_ip_port_host_order(P->GetIP())) == 0); |
| |
| if (_nSendPeerList + 1 < SEND_PEER_LIST_SIZE) { |
| _nSendPeerList++; |
| _SendPeerList[_nSendPeerList] = P; |
| return 1; // Added |
| } else { |
| return 0; // Not added |
| } |
| } |
| |
| int |
| ICPProcessor::AddPeerToParentList(Peer * P) |
| { |
| // Add (Peer *) to the parent list (ParentPeerList). |
| // Returns 1 - added; 0 - Not added |
| |
| if (_nParentPeerList + 1 < PARENT_PEER_LIST_SIZE) { |
| _nParentPeerList++; |
| _ParentPeerList[_nParentPeerList] = P; |
| return 1; // Added |
| } else { |
| return 0; // Not added |
| } |
| } |
| |
| // End of ICP.cc |