blob: 8d4b6e564cb9b34d4b9fb4811c3555bad41462d5 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/****************************************************************************
ClusterCache.cc
****************************************************************************/
#include "P_Cluster.h"
#ifdef DEBUG
#define CLUSTER_TEST_DEBUG 1
#endif
#ifdef ENABLE_TIME_TRACE
int callback_time_dist[TIME_DIST_BUCKETS_SIZE];
int cache_callbacks = 0;
int rmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
int rmt_cache_callbacks = 0;
int lkrmt_callback_time_dist[TIME_DIST_BUCKETS_SIZE];
int lkrmt_cache_callbacks = 0;
int cntlck_acquire_time_dist[TIME_DIST_BUCKETS_SIZE];
int cntlck_acquire_events = 0;
int open_delay_time_dist[TIME_DIST_BUCKETS_SIZE];
int open_delay_events = 0;
#endif // ENABLE_TIME_TRACE
// default will be read from config
int cache_migrate_on_demand = false;
/////////////////
// Static Data //
/////////////////
static ClassAllocator<CacheContinuation> cacheContAllocator("cacheContAllocator");
static Queue<CacheContinuation> remoteCacheContQueue[REMOTE_CONNECT_HASH];
static Ptr<ProxyMutex> remoteCacheContQueueMutex[REMOTE_CONNECT_HASH];
// 0 is an illegal sequence number
#define CACHE_NO_RESPONSE 0
static int cluster_sequence_number = 1;
#ifdef CLUSTER_TEST_DEBUG
static ink_hrtime cache_cluster_timeout = HRTIME_SECONDS(65536);
#else
static ink_hrtime cache_cluster_timeout = CACHE_CLUSTER_TIMEOUT;
#endif
///////////////////
// Declarations //
///////////////////
static CacheContinuation *find_cache_continuation(unsigned int, unsigned int);
static unsigned int new_cache_sequence_number();
#define DOT_SEPARATED(_x) \
((unsigned char*)&(_x))[0], ((unsigned char*)&(_x))[1], \
((unsigned char*)&(_x))[2], ((unsigned char*)&(_x))[3]
#define ET_CACHE_CONT_SM ET_NET
#define ALLOW_THREAD_STEAL true
/**********************************************************************/
#ifdef CACHE_MSG_TRACE
/**********************************************************************/
/**********************************************************************/
// Debug trace support for cache RPC messages
/**********************************************************************/
#define MAX_TENTRIES 4096
struct traceEntry
{
unsigned int seqno;
int op;
char *type;
};
struct traceEntry recvTraceTable[MAX_TENTRIES];
struct traceEntry sndTraceTable[MAX_TENTRIES];
static recvTraceTable_index = 0;
static sndTraceTable_index = 0;
void
log_cache_op_msg(unsigned int seqno, int op, char *type)
{
int t = ink_atomic_increment(&recvTraceTable_index, 1);
int n = recvTraceTable_index % MAX_TENTRIES;
recvTraceTable[n].seqno = seqno;
recvTraceTable[n].op = op;
recvTraceTable[n].type = type;
}
void
log_cache_op_sndmsg(unsigned int seqno, int op, char *type)
{
int t = ink_atomic_increment(&sndTraceTable_index, 1);
int n = sndTraceTable_index % MAX_TENTRIES;
sndTraceTable[n].seqno = seqno;
sndTraceTable[n].op = op;
sndTraceTable[n].type = type;
}
void
dump_recvtrace_table()
{
int n;
printf("\n");
for (n = 0; n < MAX_TENTRIES; ++n)
printf("[%d] seqno=%d, op=%d type=%s\n", n, recvTraceTable[n].seqno,
recvTraceTable[n].op, recvTraceTable[n].type ? recvTraceTable[n].type : "");
}
void
dump_sndtrace_table()
{
int n;
printf("\n");
for (n = 0; n < MAX_TENTRIES; ++n)
printf("[%d] seqno=%d, op=%d type=%s\n", n, sndTraceTable[n].seqno,
sndTraceTable[n].op, sndTraceTable[n].type ? sndTraceTable[n].type : "");
}
/**********************************************************************/
#endif // CACHE_MSG_TRACE
/**********************************************************************/
///////////////////////////////////////////////////////////////////////
// Cluster write VC cache.
///////////////////////////////////////////////////////////////////////
//
// In the event that a remote open read fails (HTTP only), an
// open write is issued and if successful a open write connection
// is returned for the open read. We cache the open write VC and
// resolve the subsequent open write locally from the write VC cache
// using the INK_MD5 of the URL.
// Note that this is a global per node cache.
///////////////////////////////////////////////////////////////////////
class ClusterVConnectionCache
{
public:
ClusterVConnectionCache()
{
memset(hash_event, 0, sizeof(hash_event));
}
void init();
int MD5ToIndex(INK_MD5 * p);
int insert(INK_MD5 *, ClusterVConnection *);
ClusterVConnection *lookup(INK_MD5 *);
public:
struct Entry
{
LINK(Entry, link);
bool mark_for_delete;
INK_MD5 key;
ClusterVConnection *vc;
Entry():mark_for_delete(0), vc(0)
{
}
~Entry()
{
}
};
enum
{ MAX_TABLE_ENTRIES = 256, // must be power of 2
SCAN_INTERVAL = 10 // seconds
};
Queue<Entry> hash_table[MAX_TABLE_ENTRIES];
Ptr<ProxyMutex> hash_lock[MAX_TABLE_ENTRIES];
Event *hash_event[MAX_TABLE_ENTRIES];
};
static ClassAllocator <
ClusterVConnectionCache::Entry >
ClusterVCCacheEntryAlloc("ClusterVConnectionCache::Entry");
ClusterVConnectionCache *GlobalOpenWriteVCcache = 0;
/////////////////////////////////////////////////////////////////
// Perform periodic purges of ClusterVConnectionCache entries
/////////////////////////////////////////////////////////////////
class ClusterVConnectionCacheEvent:public Continuation
{
public:
ClusterVConnectionCacheEvent(ClusterVConnectionCache * c, int n)
: Continuation(new_ProxyMutex()), cache(c), hash_index(n)
{
SET_HANDLER(&ClusterVConnectionCacheEvent::eventHandler);
}
int eventHandler(int, Event *);
private:
ClusterVConnectionCache * cache;
int hash_index;
};
void
ClusterVConnectionCache::init()
{
int n;
ClusterVConnectionCacheEvent *eh;
for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
hash_lock[n] = new_ProxyMutex();
}
for (n = 0; n < MAX_TABLE_ENTRIES; ++n) {
// Setup up periodic purge events on each hash list
eh = new ClusterVConnectionCacheEvent(this, n);
hash_event[n] =
eventProcessor.schedule_in(eh, HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
}
}
inline int
ClusterVConnectionCache::MD5ToIndex(INK_MD5 * p)
{
uint64_t i = p->fold();
int32_t h, l;
h = i >> 32;
l = i & 0xFFFFFFFF;
return ((h ^ l) % MAX_TABLE_ENTRIES) & (MAX_TABLE_ENTRIES - 1);
}
int
ClusterVConnectionCache::insert(INK_MD5 * key, ClusterVConnection * vc)
{
int index = MD5ToIndex(key);
Entry *e;
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
if (!lock) {
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERT_LOCK_MISSES_STAT);
return 0; // lock miss, retry later
} else {
// Add entry to list
e = ClusterVCCacheEntryAlloc.alloc();
e->key = *key;
e->vc = vc;
hash_table[index].enqueue(e);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_INSERTS_STAT);
}
return 1; // Success
}
ClusterVConnection *
ClusterVConnectionCache::lookup(INK_MD5 * key)
{
int index = MD5ToIndex(key);
Entry *e;
ClusterVConnection *vc = 0;
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
MUTEX_TRY_LOCK(lock, hash_lock[index], thread);
if (!lock) {
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_LOCK_MISSES_STAT);
return vc; // lock miss, retry later
} else {
e = hash_table[index].head;
while (e) {
if (*key == e->key) { // Hit
vc = e->vc;
hash_table[index].remove(e);
ClusterVCCacheEntryAlloc.free(e);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_HITS_STAT);
return vc;
} else {
e = e->link.next;
}
}
}
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_LOOKUP_MISSES_STAT);
return (ClusterVConnection *) - 1; // Miss
}
int
ClusterVConnectionCacheEvent::eventHandler(int /* event ATS_UNUSED */, Event * e)
{
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCANS_STAT);
MUTEX_TRY_LOCK(lock, cache->hash_lock[hash_index], this_ethread());
if (!lock) {
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_SCAN_LOCK_MISSES_STAT);
e->schedule_in(HRTIME_MSECONDS(10));
return EVENT_DONE;
}
// Perform purge action on unreferenced VC(s).
ClusterVConnectionCache::Entry * entry;
ClusterVConnectionCache::Entry * next_entry;
entry = cache->hash_table[hash_index].head;
while (entry) {
if (entry->mark_for_delete) {
next_entry = entry->link.next;
cache->hash_table[hash_index].remove(entry);
entry->vc->allow_remote_close();
entry->vc->do_io(VIO::CLOSE);
ClusterVCCacheEntryAlloc.free(entry);
entry = next_entry;
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_VC_CACHE_PURGES_STAT);
} else {
entry->mark_for_delete = true;
entry = entry->link.next;
}
}
// Setup for next purge event
e->schedule_in(HRTIME_SECONDS(ClusterVConnectionCache::SCAN_INTERVAL), ET_CACHE_CONT_SM);
return EVENT_DONE;
}
///////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////
// init()
// Global initializations for CacheContinuation
////////////////////////////////////////////////////
int
CacheContinuation::init()
{
int n;
for (n = 0; n < REMOTE_CONNECT_HASH; ++n)
remoteCacheContQueueMutex[n] = new_ProxyMutex();
GlobalOpenWriteVCcache = new ClusterVConnectionCache;
GlobalOpenWriteVCcache->init();
return 0;
}
///////////////////////////////////////////////////////////////////////
// do_op()
// Main function to do a cluster cache operation
///////////////////////////////////////////////////////////////////////
Action *
CacheContinuation::do_op(Continuation * c, ClusterMachine * mp, void *args,
int user_opcode, char *data, int data_len, int nbytes, MIOBuffer * b)
{
CacheContinuation *cc = 0;
Action *act = 0;
char *msg = 0;
ClusterHandler *ch = mp->pop_ClusterHandler();
/////////////////////////////////////////////////////////////////////
// Unconditionally map open read buffer interfaces to open read.
// open read buffer interfaces are now deprecated.
/////////////////////////////////////////////////////////////////////
int opcode = user_opcode;
switch (opcode) {
case CACHE_OPEN_READ_BUFFER:
opcode = CACHE_OPEN_READ;
break;
case CACHE_OPEN_READ_BUFFER_LONG:
opcode = CACHE_OPEN_READ_LONG;
break;
default:
break;
}
if (!ch)
goto no_send_exit;
if (c) {
cc = cacheContAllocator_alloc();
cc->ch = ch;
cc->target_machine = mp;
cc->request_opcode = opcode;
cc->mutex = c->mutex;
cc->action = c;
cc->action.cancelled = false;
cc->start_time = ink_get_hrtime();
cc->from = mp;
cc->result = op_failure(opcode);
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
& CacheContinuation::remoteOpEvent);
act = &cc->action;
// set up sequence number so we can find this continuation
cc->target_ip = mp->ip;
cc->seq_number = new_cache_sequence_number();
// establish timeout for cache op
unsigned int hash = FOLDHASH(cc->target_ip, cc->seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
if (!queuelock) {
// failed to acquire lock: no problem, retry later
cc->timeout = eventProcessor.schedule_in(cc, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
} else {
remoteCacheContQueue[hash].enqueue(cc);
MUTEX_RELEASE(queuelock);
cc->timeout = eventProcessor.schedule_in(cc, cache_cluster_timeout, ET_CACHE_CONT_SM);
}
}
//
// Determine the type of the "Over The Wire" (OTW) message header and
// initialize it.
//
Debug("cache_msg",
"do_op opcode=%d seqno=%d Machine=%p data=%p datalen=%d mio=%p",
opcode, (c ? cc->seq_number : CACHE_NO_RESPONSE), mp, data, data_len, b);
switch (opcode) {
case CACHE_OPEN_WRITE_BUFFER:
case CACHE_OPEN_WRITE_BUFFER_LONG:
{
ink_release_assert(!"write buffer not supported");
break;
}
case CACHE_OPEN_READ_BUFFER:
case CACHE_OPEN_READ_BUFFER_LONG:
{
ink_release_assert(!"read buffer not supported");
break;
}
case CACHE_OPEN_WRITE:
case CACHE_OPEN_READ:
{
ink_release_assert(c > 0);
//////////////////////
// Use short format //
//////////////////////
if (!data) {
data_len = op_to_sizeof_fixedlen_msg(opcode);
data = (char *) ALLOCA_DOUBLE(data_len);
}
msg = (char *) data;
CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
m->init();
m->opcode = opcode;
m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
m->md5 = *((CacheOpArgs_General *) args)->url_md5;
cc->url_md5 = m->md5;
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
m->frag_type = ((CacheOpArgs_General *) args)->frag_type;
if (opcode == CACHE_OPEN_WRITE) {
m->nbytes = nbytes;
m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
} else {
m->nbytes = 0;
m->data = 0;
}
if (opcode == CACHE_OPEN_READ) {
//
// Set upper limit on initial data received with response
// for open read response
//
m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
} else {
m->buffer_size = 0;
}
//
// Establish the local VC
//
int res = setup_local_vc(msg, data_len, cc, mp, &act);
if (!res) {
/////////////////////////////////////////////////////
// Unable to setup local VC, request aborted.
// Remove request from pending list and deallocate.
/////////////////////////////////////////////////////
cc->remove_and_delete(0, (Event *) 0);
return act;
} else if (res != -1) {
///////////////////////////////////////
// VC established, send request
///////////////////////////////////////
break;
} else {
//////////////////////////////////////////////////////
// Unable to setup VC, delay required, await callback
//////////////////////////////////////////////////////
goto no_send_exit;
}
}
case CACHE_OPEN_READ_LONG:
case CACHE_OPEN_WRITE_LONG:
{
ink_release_assert(c > 0);
//////////////////////
// Use long format //
//////////////////////
msg = data;
CacheOpMsg_long *m = (CacheOpMsg_long *) msg;
m->init();
m->opcode = opcode;
m->cfl_flags = ((CacheOpArgs_General *) args)->cfl_flags;
m->url_md5 = *((CacheOpArgs_General *) args)->url_md5;
cc->url_md5 = m->url_md5;
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
m->nbytes = nbytes;
m->data = (uint32_t) ((CacheOpArgs_General *) args)->pin_in_cache;
m->frag_type = (uint32_t) ((CacheOpArgs_General *) args)->frag_type;
if (opcode == CACHE_OPEN_READ_LONG) {
//
// Set upper limit on initial data received with response
// for open read response
//
m->buffer_size = DEFAULT_MAX_BUFFER_SIZE;
} else {
m->buffer_size = 0;
}
//
// Establish the local VC
//
int res = setup_local_vc(msg, data_len, cc, mp, &act);
if (!res) {
/////////////////////////////////////////////////////
// Unable to setup local VC, request aborted.
// Remove request from pending list and deallocate.
/////////////////////////////////////////////////////
cc->remove_and_delete(0, (Event *) 0);
return act;
} else if (res != -1) {
///////////////////////////////////////
// VC established, send request
///////////////////////////////////////
break;
} else {
//////////////////////////////////////////////////////
// Unable to setup VC, delay required, await callback
//////////////////////////////////////////////////////
goto no_send_exit;
}
}
case CACHE_UPDATE:
case CACHE_REMOVE:
case CACHE_DEREF:
{
//////////////////////
// Use short format //
//////////////////////
msg = data;
CacheOpMsg_short *m = (CacheOpMsg_short *) msg;
m->init();
m->opcode = opcode;
m->frag_type = ((CacheOpArgs_Deref *) args)->frag_type;
m->cfl_flags = ((CacheOpArgs_Deref *) args)->cfl_flags;
if (opcode == CACHE_DEREF)
m->md5 = *((CacheOpArgs_Deref *) args)->md5;
else
m->md5 = *((CacheOpArgs_General *) args)->url_md5;
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
break;
}
case CACHE_LINK:
{
////////////////////////
// Use short_2 format //
////////////////////////
msg = data;
CacheOpMsg_short_2 *m = (CacheOpMsg_short_2 *) msg;
m->init();
m->opcode = opcode;
m->cfl_flags = ((CacheOpArgs_Link *) args)->cfl_flags;
m->md5_1 = *((CacheOpArgs_Link *) args)->from;
m->md5_2 = *((CacheOpArgs_Link *) args)->to;
m->seq_number = (c ? cc->seq_number : CACHE_NO_RESPONSE);
m->frag_type = ((CacheOpArgs_Link *) args)->frag_type;
break;
}
default:
msg = 0;
break;
}
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg((c ? cc->seq_number : CACHE_NO_RESPONSE), 0, "do_op");
#endif
clusterProcessor.invoke_remote(ch,
op_needs_marshalled_coi(opcode) ? CACHE_OP_MALLOCED_CLUSTER_FUNCTION
: CACHE_OP_CLUSTER_FUNCTION, (char *) msg, data_len);
no_send_exit:
if (c) {
return act;
} else {
return (Action *) 0;
}
}
int
CacheContinuation::setup_local_vc(char *data, int data_len, CacheContinuation * cc, ClusterMachine * mp, Action ** act)
{
bool read_op = op_is_read(cc->request_opcode);
bool short_msg = op_is_shortform(cc->request_opcode);
// Alloc buffer, copy message and attach to continuation
cc->setMsgBufferLen(data_len);
cc->allocMsgBuffer();
memcpy(cc->getMsgBuffer(), data, data_len);
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
& CacheContinuation::localVCsetupEvent);
if (short_msg) {
Debug("cache_proto", "open_local-s (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
} else {
Debug("cache_proto", "open_local-l (%s) seqno=%d", (read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
}
// Create local VC
ClusterVConnection *vc;
if (!read_op && (cc->request_opcode == CACHE_OPEN_WRITE_LONG)) {
// Determine if the open_write has already been established.
vc = cc->lookupOpenWriteVC();
} else {
vc = clusterProcessor.open_local(cc, mp, cc->open_local_token,
(CLUSTER_OPT_ALLOW_IMMEDIATE |
(read_op ? CLUSTER_OPT_CONN_READ : CLUSTER_OPT_CONN_WRITE)));
}
if (!vc) {
// Error, abort request
if (short_msg) {
Debug("cache_proto", "0open_local-s (%s) failed, seqno=%d",
(read_op ? "R" : "W"), ((CacheOpMsg_short *) data)->seq_number);
} else {
Debug("cache_proto", "1open_local-l (%s) failed, seqno=%d",
(read_op ? "R" : "W"), ((CacheOpMsg_long *) data)->seq_number);
}
cc->freeMsgBuffer();
if (cc->timeout)
cc->timeout->cancel();
cc->timeout = NULL;
// Post async failure callback on a different continuation.
*act = callback_failure(&cc->action, (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
return 0;
} else if (vc != CLUSTER_DELAYED_OPEN) {
// We have established the VC
if (read_op) {
cc->read_cluster_vc = vc;
} else {
cc->write_cluster_vc = vc;
}
cc->cluster_vc_channel = vc->channel;
vc->current_cont = cc;
if (short_msg) {
CacheOpMsg_short *ms = (CacheOpMsg_short *) data;
ms->channel = vc->channel;
ms->token = cc->open_local_token;
Debug("cache_proto",
"0open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
(read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
} else {
CacheOpMsg_long *ml = (CacheOpMsg_long *) data;
ml->channel = vc->channel;
ml->token = cc->open_local_token;
Debug("cache_proto",
"1open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
(read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
}
cc->freeMsgBuffer();
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
& CacheContinuation::remoteOpEvent);
return 1;
} else {
//////////////////////////////////////////////////////
// Unable to setup VC, delay required, await callback
//////////////////////////////////////////////////////
return -1;
}
}
ClusterVConnection *
CacheContinuation::lookupOpenWriteVC()
{
///////////////////////////////////////////////////////////////
// See if we already have an open_write ClusterVConnection
// which was established in a previous remote open_read which
// failed.
///////////////////////////////////////////////////////////////
ClusterVConnection *vc;
CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
vc = GlobalOpenWriteVCcache->lookup(&ml->url_md5);
if (vc == ((ClusterVConnection *) 0)) {
// Retry lookup
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
& CacheContinuation::lookupOpenWriteVCEvent);
//
// Note: In the lookupOpenWriteVCEvent handler, we use EVENT_IMMEDIATE
// to distinguish the lookup retry from a request timeout
// which uses EVENT_INTERVAL.
//
lookup_open_write_vc_event = eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
} else if (vc != ((ClusterVConnection *) - 1)) {
// Hit, found open_write VC in cache.
// Post open_write completion by simulating a
// remote cache op result message.
vc->action_ = action; // establish new continuation
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
& CacheContinuation::localVCsetupEvent);
this->handleEvent(CLUSTER_EVENT_OPEN_EXISTS, vc);
CacheOpReplyMsg msg;
int msglen;
msglen = CacheOpReplyMsg::sizeof_fixedlen_msg();
msg.result = CACHE_EVENT_OPEN_WRITE;
msg.seq_number = seq_number;
msg.token = vc->token;
cache_op_result_ClusterFunction(ch, (void *) &msg, msglen);
} else {
// Miss, establish local VC and send remote open_write request
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
& CacheContinuation::localVCsetupEvent);
vc = clusterProcessor.open_local(this, from, open_local_token,
(CLUSTER_OPT_ALLOW_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
if (!vc) {
this->handleEvent(CLUSTER_EVENT_OPEN_FAILED, 0);
} else if (vc != CLUSTER_DELAYED_OPEN) {
this->handleEvent(CLUSTER_EVENT_OPEN, vc);
}
}
return CLUSTER_DELAYED_OPEN; // force completion in callback
}
int
CacheContinuation::lookupOpenWriteVCEvent(int event, Event * e)
{
if (event == EVENT_IMMEDIATE) {
// Retry open_write VC lookup
lookupOpenWriteVC();
} else {
lookup_open_write_vc_event->cancel();
SET_CONTINUATION_HANDLER(this, (CacheContHandler)
& CacheContinuation::localVCsetupEvent);
this->handleEvent(event, e);
}
return EVENT_DONE;
}
int
CacheContinuation::remove_and_delete(int /* event ATS_UNUSED */, Event * e)
{
unsigned int hash = FOLDHASH(target_ip, seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
if (queuelock) {
if (remoteCacheContQueue[hash].in(this)) {
remoteCacheContQueue[hash].remove(this);
}
MUTEX_RELEASE(queuelock);
if (use_deferred_callback)
callback_failure(&action, result, result_error, this);
else
cacheContAllocator_free(this);
} else {
SET_HANDLER((CacheContHandler) & CacheContinuation::remove_and_delete);
if (!e) {
timeout = eventProcessor.schedule_in(this, cache_cluster_timeout, ET_CACHE_CONT_SM);
} else {
e->schedule_in(cache_cluster_timeout);
}
}
return EVENT_DONE;
}
int
CacheContinuation::localVCsetupEvent(int event, ClusterVConnection * vc)
{
ink_assert(magicno == (int) MagicNo);
ink_assert(getMsgBuffer());
bool short_msg = op_is_shortform(request_opcode);
bool read_op = op_is_read(request_opcode);
if (event == EVENT_INTERVAL) {
Event *e = (Event *) vc;
unsigned int hash = FOLDHASH(target_ip, seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
if (!queuelock) {
e->schedule_in(CACHE_RETRY_PERIOD);
return EVENT_CONT;
}
if (!remoteCacheContQueue[hash].in(this)) {
////////////////////////////////////////////////////
// Not yet queued on outstanding operations list
////////////////////////////////////////////////////
remoteCacheContQueue[hash].enqueue(this);
ink_assert(timeout == e);
MUTEX_RELEASE(queuelock);
e->schedule_in(cache_cluster_timeout);
return EVENT_CONT;
} else {
/////////////////////////////////////////////////////
// Timeout occurred
/////////////////////////////////////////////////////
remoteCacheContQueue[hash].remove(this);
MUTEX_RELEASE(queuelock);
Debug("cluster_timeout", "0cluster op timeout %d", seq_number);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
timeout = (Event *) 1; // Note timeout
/////////////////////////////////////////////////////////////////
// Note: Failure callback is sent now, but the deallocation of
// the CacheContinuation is deferred until we receive the
// open_local() callback.
/////////////////////////////////////////////////////////////////
if (!action.cancelled)
action.continuation->handleEvent((read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED), 0);
return EVENT_DONE;
}
} else if (((event == CLUSTER_EVENT_OPEN) || (event == CLUSTER_EVENT_OPEN_EXISTS))
&& (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0)) {
ink_hrtime now;
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_OPEN_DELAY_TIME_STAT, now - start_time);
LOG_EVENT_TIME(start_time, open_delay_time_dist, open_delay_events);
if (read_op) {
read_cluster_vc = vc;
} else {
write_cluster_vc = vc;
}
cluster_vc_channel = vc->channel;
vc->current_cont = this;
if (short_msg) {
CacheOpMsg_short *ms = (CacheOpMsg_short *) getMsgBuffer();
ms->channel = vc->channel;
ms->token = open_local_token;
Debug("cache_proto",
"2open_local-s (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
(read_op ? "R" : "W"), ms->seq_number, vc->channel, ms->token.ip_created, ms->token.sequence_number, vc);
} else {
CacheOpMsg_long *ml = (CacheOpMsg_long *) getMsgBuffer();
ml->channel = vc->channel;
ml->token = open_local_token;
Debug("cache_proto",
"3open_local-l (%s) success, seqno=%d chan=%d token=%d,%d VC=%p",
(read_op ? "R" : "W"), ml->seq_number, vc->channel, ml->token.ip_created, ml->token.sequence_number, vc);
}
SET_HANDLER((CacheContHandler) & CacheContinuation::remoteOpEvent);
if (event != CLUSTER_EVENT_OPEN_EXISTS) {
// Send request message
clusterProcessor.invoke_remote(ch,
(op_needs_marshalled_coi(request_opcode) ?
CACHE_OP_MALLOCED_CLUSTER_FUNCTION :
CACHE_OP_CLUSTER_FUNCTION), (char *) getMsgBuffer(), getMsgBufferLen());
}
} else {
int send_failure_callback = 1;
if (((ptrdiff_t) timeout & (ptrdiff_t) 1) == 0) {
if (short_msg) {
Debug("cache_proto", "2open_local-s (%s) failed, seqno=%d",
(read_op ? "R" : "W"), ((CacheOpMsg_short *) getMsgBuffer())->seq_number);
} else {
Debug("cache_proto", "3open_local-l (%s) failed, seqno=%d",
(read_op ? "R" : "W"), ((CacheOpMsg_long *) getMsgBuffer())->seq_number);
}
} else {
Debug("cache_proto", "4open_local cancelled due to timeout, seqno=%d", seq_number);
this->timeout = 0;
// Deallocate VC if successfully acquired
if (event == CLUSTER_EVENT_OPEN) {
vc->pending_remote_fill = 0;
vc->remote_closed = 1; // avoid remote close msg
vc->do_io(VIO::CLOSE);
}
send_failure_callback = 0; // already sent.
}
if (this->timeout)
this->timeout->cancel();
this->timeout = NULL;
freeMsgBuffer();
if (send_failure_callback) {
//
// Action corresponding to "this" already sent back to user,
// use "this" to establish the failure callback after
// removing ourselves from the active list.
//
this->use_deferred_callback = true;
this->result = (read_op ? CACHE_EVENT_OPEN_READ_FAILED : CACHE_EVENT_OPEN_WRITE_FAILED);
this->result_error = 0;
remove_and_delete(0, (Event *) 0);
} else {
cacheContAllocator_free(this);
}
return EVENT_DONE;
}
// Free message
freeMsgBuffer();
return EVENT_DONE;
}
///////////////////////////////////////////////////////////////////////////
// cache_op_ClusterFunction()
// On the receiving side, handle a general cluster cache operation
///////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
// Marshaling functions for OTW message headers
////////////////////////////////////////////////////////////////////////
inline CacheOpMsg_long *
unmarshal_CacheOpMsg_long(void *data, int NeedByteSwap)
{
if (NeedByteSwap)
((CacheOpMsg_long *) data)->SwapBytes();
return (CacheOpMsg_long *) data;
}
inline CacheOpMsg_short *
unmarshal_CacheOpMsg_short(void *data, int NeedByteSwap)
{
if (NeedByteSwap)
((CacheOpMsg_short *) data)->SwapBytes();
return (CacheOpMsg_short *) data;
}
inline CacheOpMsg_short_2 *
unmarshal_CacheOpMsg_short_2(void *data, int NeedByteSwap)
{
if (NeedByteSwap)
((CacheOpMsg_short_2 *) data)->SwapBytes();
return (CacheOpMsg_short_2 *) data;
}
// init_from_long() support routine for cache_op_ClusterFunction()
inline void
init_from_long(CacheContinuation * cont, CacheOpMsg_long * msg, ClusterMachine * m)
{
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
cont->seq_number = msg->seq_number;
cont->cfl_flags = msg->cfl_flags;
cont->from = m;
cont->url_md5 = msg->url_md5;
cont->cluster_vc_channel = msg->channel;
cont->frag_type = (CacheFragType) msg->frag_type;
if ((cont->request_opcode == CACHE_OPEN_WRITE_LONG)
|| (cont->request_opcode == CACHE_OPEN_READ_LONG)) {
cont->pin_in_cache = (time_t) msg->data;
} else {
cont->pin_in_cache = 0;
}
cont->token = msg->token;
cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
if (cont->request_opcode == CACHE_OPEN_READ_LONG) {
cont->caller_buf_freebytes = msg->buffer_size;
} else {
cont->caller_buf_freebytes = 0;
}
}
// init_from_short() support routine for cache_op_ClusterFunction()
inline void
init_from_short(CacheContinuation * cont, CacheOpMsg_short * msg, ClusterMachine * m)
{
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
cont->seq_number = msg->seq_number;
cont->cfl_flags = msg->cfl_flags;
cont->from = m;
cont->url_md5 = msg->md5;
cont->cluster_vc_channel = msg->channel;
cont->token = msg->token;
cont->nbytes = (((int) msg->nbytes < 0) ? 0 : msg->nbytes);
cont->frag_type = (CacheFragType) msg->frag_type;
if (cont->request_opcode == CACHE_OPEN_WRITE) {
cont->pin_in_cache = (time_t) msg->data;
} else {
cont->pin_in_cache = 0;
}
if (cont->request_opcode == CACHE_OPEN_READ) {
cont->caller_buf_freebytes = msg->buffer_size;
} else {
cont->caller_buf_freebytes = 0;
}
}
// init_from_short_2() support routine for cache_op_ClusterFunction()
inline void
init_from_short_2(CacheContinuation * cont, CacheOpMsg_short_2 * msg, ClusterMachine * m)
{
cont->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
cont->seq_number = msg->seq_number;
cont->cfl_flags = msg->cfl_flags;
cont->from = m;
cont->url_md5 = msg->md5_1;
cont->frag_type = (CacheFragType) msg->frag_type;
}
void
cache_op_ClusterFunction(ClusterHandler * ch, void *data, int len)
{
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
////////////////////////////////////////////////////////
// Note: we are running on the ET_CLUSTER thread
////////////////////////////////////////////////////////
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
int opcode;
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
if (mh->GetMsgVersion() != CacheOpMsg_long::CACHE_OP_LONG_MESSAGE_VERSION) { ////////////////////////////////////////////////
// Convert from old to current message format
////////////////////////////////////////////////
ink_release_assert(!"cache_op_ClusterFunction() bad msg version");
}
opcode = ((CacheOpMsg_long *) data)->opcode;
// If necessary, create a continuation to reflect the response back
CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
c->mutex = new_ProxyMutex();
MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
c->request_opcode = opcode;
c->token.clear();
c->start_time = ink_get_hrtime();
c->ch = ch;
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::replyOpEvent);
switch (opcode) {
case CACHE_OPEN_WRITE_BUFFER:
case CACHE_OPEN_WRITE_BUFFER_LONG:
ink_release_assert(!"cache_op_ClusterFunction WRITE_BUFFER not supported");
break;
case CACHE_OPEN_READ_BUFFER:
case CACHE_OPEN_READ_BUFFER_LONG:
ink_release_assert(!"cache_op_ClusterFunction READ_BUFFER not supported");
break;
case CACHE_OPEN_READ:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
init_from_short(c, msg, ch->machine);
Debug("cache_msg",
"cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
//
// Establish the remote side of the ClusterVConnection
//
c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
&c->token,
c->cluster_vc_channel,
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
if (!c->write_cluster_vc) {
// Unable to setup channel, abort processing.
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
Debug("chan_inuse",
"1Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
// Send cluster op failed reply
c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
break;
} else {
c->write_cluster_vc->current_cont = c;
}
ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
ink_release_assert((opcode == CACHE_OPEN_READ)
|| c->write_cluster_vc->pending_remote_fill);
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::setupVCdataRead);
Debug("cache_proto",
"0read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_open_read");
#endif
CacheKey key(msg->md5);
char *hostname = NULL;
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
if (host_len) {
hostname = (char *) msg->moi.byte;
}
Cache *call_cache = caches[c->frag_type];
c->cache_action = call_cache->open_read(c, &key, c->frag_type, hostname, host_len);
break;
}
case CACHE_OPEN_READ_LONG:
{
// Cache needs message data, copy it.
c->setMsgBufferLen(len);
c->allocMsgBuffer();
memcpy(c->getMsgBuffer(), (char *) data, len);
int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
init_from_long(c, msg, ch->machine);
Debug("cache_msg",
"cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_open_read_long");
#endif
//
// Establish the remote side of the ClusterVConnection
//
c->write_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
&c->token,
c->cluster_vc_channel,
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_READ));
if (!c->write_cluster_vc) {
// Unable to setup channel, abort processing.
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
Debug("chan_inuse",
"2Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
// Send cluster op failed reply
c->replyOpEvent(CACHE_EVENT_OPEN_READ_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
break;
} else {
c->write_cluster_vc->current_cont = c;
}
ink_release_assert(c->write_cluster_vc != CLUSTER_DELAYED_OPEN);
ink_release_assert((opcode == CACHE_OPEN_READ_LONG)
|| c->write_cluster_vc->pending_remote_fill);
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::setupReadWriteVC);
Debug("cache_proto",
"1read op, seqno=%d chan=%d bufsize=%d token=%d,%d",
msg->seq_number, msg->channel, msg->buffer_size, msg->token.ip_created, msg->token.sequence_number);
const char *p = (const char *) msg + flen;
int moi_len = len - flen;
int res;
ink_assert(moi_len > 0);
// Unmarshal CacheHTTPHdr
res = c->ic_request.unmarshal((char *) p, moi_len, NULL);
ink_assert(res > 0);
ink_assert(c->ic_request.valid());
c->request_purge = c->ic_request.method_get_wksidx() == HTTP_WKSIDX_PURGE || c->ic_request.method_get_wksidx() == HTTP_WKSIDX_DELETE;
moi_len -= res;
p += res;
ink_assert(moi_len > 0);
// Unmarshal CacheLookupHttpConfig
c->ic_params = new(CacheLookupHttpConfigAllocator.alloc())
CacheLookupHttpConfig();
res = c->ic_params->unmarshal(&c->ic_arena, (const char *) p, moi_len);
ink_assert(res > 0);
moi_len -= res;
p += res;
CacheKey key(msg->url_md5);
char *hostname = NULL;
int host_len = 0;
if (moi_len) {
hostname = (char *) p;
host_len = moi_len;
// Save hostname and attach it to the continuation since we may
// need it if we convert this to an open_write.
c->ic_hostname = new_IOBufferData(iobuffer_size_to_index(host_len));
c->ic_hostname_len = host_len;
memcpy(c->ic_hostname->data(), hostname, host_len);
}
Cache *call_cache = caches[c->frag_type];
Action *a = call_cache->open_read(c, &key, &c->ic_request,
c->ic_params,
c->frag_type, hostname, host_len);
// Get rid of purify warnings since 'c' can be freed by open_read.
if (a != ACTION_RESULT_DONE) {
c->cache_action = a;
}
break;
}
case CACHE_OPEN_WRITE:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
init_from_short(c, msg, ch->machine);
Debug("cache_msg",
"cache_op-s op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_open_write");
#endif
//
// Establish the remote side of the ClusterVConnection
//
c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
&c->token,
c->cluster_vc_channel,
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
if (!c->read_cluster_vc) {
// Unable to setup channel, abort processing.
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
Debug("chan_inuse",
"3Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
// Send cluster op failed reply
c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
break;
} else {
c->read_cluster_vc->current_cont = c;
}
ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
CacheKey key(msg->md5);
char *hostname = NULL;
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
if (host_len) {
hostname = (char *) msg->moi.byte;
}
Cache *call_cache = caches[c->frag_type];
Action *a = call_cache->open_write(c, &key, c->frag_type,
!!(c->cfl_flags & CFL_OVERWRITE_ON_WRITE),
c->pin_in_cache, hostname, host_len);
if (a != ACTION_RESULT_DONE) {
c->cache_action = a;
}
break;
}
case CACHE_OPEN_WRITE_LONG:
{
// Cache needs message data, copy it.
c->setMsgBufferLen(len);
c->allocMsgBuffer();
memcpy(c->getMsgBuffer(), (char *) data, len);
int flen = CacheOpMsg_long::sizeof_fixedlen_msg();
CacheOpMsg_long *msg = unmarshal_CacheOpMsg_long(c->getMsgBuffer(), mh->NeedByteSwap());
init_from_long(c, msg, ch->machine);
Debug("cache_msg",
"cache_op-l op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_open_write_long");
#endif
//
// Establish the remote side of the ClusterVConnection
//
c->read_cluster_vc = clusterProcessor.connect_local((Continuation *) 0,
&c->token,
c->cluster_vc_channel,
(CLUSTER_OPT_IMMEDIATE | CLUSTER_OPT_CONN_WRITE));
if (!c->read_cluster_vc) {
// Unable to setup channel, abort processing.
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CHAN_INUSE_STAT);
Debug("chan_inuse",
"4Remote chan=%d inuse tok.ip=%u.%u.%u.%u tok.seqno=%d seqno=%d",
c->cluster_vc_channel, DOT_SEPARATED(c->token.ip_created), c->token.sequence_number, c->seq_number);
// Send cluster op failed reply
c->replyOpEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (VConnection *) - ECLUSTER_CHANNEL_INUSE);
break;
} else {
c->read_cluster_vc->current_cont = c;
}
ink_release_assert(c->read_cluster_vc != CLUSTER_DELAYED_OPEN);
CacheHTTPInfo *ci = 0;
const char *p = (const char *) msg + flen;
int res = 0;
int moi_len = len - flen;
if (moi_len && c->cfl_flags & CFL_LOPENWRITE_HAVE_OLDINFO) {
// Unmarshal old CacheHTTPInfo
res = HTTPInfo::unmarshal((char *) p, moi_len, NULL);
ink_assert(res > 0);
c->ic_old_info.get_handle((char *) p, moi_len);
ink_assert(c->ic_old_info.valid());
ci = &c->ic_old_info;
}
if (c->cfl_flags & CFL_ALLOW_MULTIPLE_WRITES) {
ink_assert(!ci);
ci = (CacheHTTPInfo *) CACHE_ALLOW_MULTIPLE_WRITES;
}
moi_len -= res;
p += res;
CacheKey key(msg->url_md5);
char *hostname = NULL;
if (moi_len) {
hostname = (char *) p;
}
Cache *call_cache = caches[c->frag_type];
Action *a = call_cache->open_write(c, &key, ci, c->pin_in_cache,
NULL, c->frag_type, hostname, moi_len);
if (a != ACTION_RESULT_DONE) {
c->cache_action = a;
}
break;
}
case CACHE_REMOVE:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
init_from_short(c, msg, ch->machine);
Debug("cache_msg",
"cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_remove");
#endif
CacheKey key(msg->md5);
char *hostname = NULL;
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
if (host_len) {
hostname = (char *) msg->moi.byte;
}
Cache *call_cache = caches[c->frag_type];
Action *a = call_cache->remove(c, &key, c->frag_type,
!!(c->cfl_flags & CFL_REMOVE_USER_AGENTS),
!!(c->cfl_flags & CFL_REMOVE_LINK),
hostname, host_len);
if (a != ACTION_RESULT_DONE) {
c->cache_action = a;
}
break;
}
case CACHE_LINK:
{
CacheOpMsg_short_2 *msg = unmarshal_CacheOpMsg_short_2(data, mh->NeedByteSwap());
init_from_short_2(c, msg, ch->machine);
Debug("cache_msg",
"cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_link");
#endif
CacheKey key1(msg->md5_1);
CacheKey key2(msg->md5_2);
char *hostname = NULL;
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
if (host_len) {
hostname = (char *) msg->moi.byte;
}
Cache *call_cache = caches[c->frag_type];
Action *a = call_cache->link(c, &key1, &key2, c->frag_type,
hostname, host_len);
if (a != ACTION_RESULT_DONE) {
c->cache_action = a;
}
break;
}
case CACHE_DEREF:
{
CacheOpMsg_short *msg = unmarshal_CacheOpMsg_short(data, mh->NeedByteSwap());
init_from_short(c, msg, ch->machine);
Debug("cache_msg",
"cache_op op=%d seqno=%d data=%p len=%d machine=%p", opcode, c->seq_number, data, len, ch->machine);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, len, "cache_op_deref");
#endif
CacheKey key(msg->md5);
char *hostname = NULL;
int host_len = len - op_to_sizeof_fixedlen_msg(opcode);
if (host_len) {
hostname = (char *) msg->moi.byte;
}
Cache *call_cache = caches[c->frag_type];
Action *a = call_cache->deref(c, &key, c->frag_type,
hostname, host_len);
if (a != ACTION_RESULT_DONE) {
c->cache_action = a;
}
break;
}
default:
{
ink_release_assert(0);
}
} // End of switch
}
void
cache_op_malloc_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
cache_op_ClusterFunction(ch, data, len);
// We own the message data, free it back to the Cluster subsystem
clusterProcessor.free_remote_data((char *) data, len);
}
int
CacheContinuation::setupVCdataRead(int event, VConnection * vc)
{
ink_assert(magicno == (int) MagicNo);
//
// Setup the initial data read for the given Cache VC.
// This data is sent back in the response message.
//
if (event == CACHE_EVENT_OPEN_READ) {
//////////////////////////////////////////
// Allocate buffer and initiate read.
//////////////////////////////////////////
Debug("cache_proto", "setupVCdataRead CACHE_EVENT_OPEN_READ seqno=%d", seq_number);
ink_release_assert(caller_buf_freebytes);
SET_HANDLER((CacheContHandler) & CacheContinuation::VCdataRead);
int64_t size_index = iobuffer_size_to_index(caller_buf_freebytes);
MIOBuffer *buf = new_MIOBuffer(size_index);
readahead_reader = buf->alloc_reader();
MUTEX_TRY_LOCK(lock, mutex, this_ethread()); // prevent immediate callback
readahead_vio = vc->do_io_read(this, caller_buf_freebytes, buf);
return EVENT_DONE;
} else {
// Error case, deflect processing to replyOpEvent.
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
return handleEvent(event, vc);
}
}
int
CacheContinuation::VCdataRead(int event, VIO * target_vio)
{
ink_release_assert(magicno == (int) MagicNo);
ink_release_assert(readahead_vio == target_vio);
VConnection *vc = target_vio->vc_server;
int reply = CACHE_EVENT_OPEN_READ;
int32_t object_size;
switch (event) {
case VC_EVENT_EOS:
{
if (!target_vio->ndone) {
// Doc with zero byte body, handle as read failure
goto read_failed;
}
// Fall through
}
case VC_EVENT_READ_READY:
case VC_EVENT_READ_COMPLETE:
{
int clone_bytes;
int current_ndone = target_vio->ndone;
ink_assert(current_ndone);
ink_assert(current_ndone <= readahead_reader->read_avail());
object_size = getObjectSize(vc, request_opcode, &cache_vc_info);
have_all_data = ((object_size <= caller_buf_freebytes) && (object_size == current_ndone));
// Use no more than the caller's max buffer limit
clone_bytes = current_ndone;
if (!have_all_data) {
if (current_ndone > caller_buf_freebytes) {
clone_bytes = caller_buf_freebytes;
}
}
// Clone data
IOBufferBlock *tail;
readahead_data = clone_IOBufferBlockList(readahead_reader->get_current_block(),
readahead_reader->start_offset, clone_bytes, &tail);
if (have_all_data) {
// Close VC, since no more data and also to avoid VC_EVENT_EOS
MIOBuffer *mbuf = target_vio->buffer.writer();
vc->do_io(VIO::CLOSE);
free_MIOBuffer(mbuf);
readahead_vio = 0;
}
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
handleEvent(reply, vc);
return EVENT_CONT;
}
case VC_EVENT_ERROR:
case VC_EVENT_INACTIVITY_TIMEOUT:
case VC_EVENT_ACTIVE_TIMEOUT:
default:
{
read_failed:
// Read failed, deflect to replyOpEvent.
MIOBuffer * mbuf = target_vio->buffer.writer();
vc->do_io(VIO::CLOSE);
free_MIOBuffer(mbuf);
readahead_vio = 0;
reply = CACHE_EVENT_OPEN_READ_FAILED;
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
handleEvent(reply, (VConnection *) - ECLUSTER_ORB_DATA_READ);
return EVENT_DONE;
}
} // End of switch
}
int
CacheContinuation::setupReadWriteVC(int event, VConnection * vc)
{
// Only handles OPEN_READ_LONG processing.
switch (event) {
case CACHE_EVENT_OPEN_READ:
{
// setup readahead
SET_HANDLER((CacheContHandler) & CacheContinuation::setupVCdataRead);
return handleEvent(event, vc);
break;
}
case CACHE_EVENT_OPEN_READ_FAILED:
{
if (frag_type == CACHE_FRAG_TYPE_HTTP && !request_purge) {
// HTTP open read failed, attempt open write now to avoid an additional
// message round trip
CacheKey key(url_md5);
Cache *call_cache = caches[frag_type];
Action *a = call_cache->open_write(this, &key, 0, pin_in_cache,
NULL, frag_type, ic_hostname ? ic_hostname->data() : NULL,
ic_hostname_len);
if (a != ACTION_RESULT_DONE) {
cache_action = a;
}
} else {
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
}
break;
}
case CACHE_EVENT_OPEN_WRITE:
{
// Convert from read to write connection
ink_assert(!read_cluster_vc && write_cluster_vc);
read_cluster_vc = write_cluster_vc;
read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
write_cluster_vc = 0;
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
return handleEvent(event, vc);
break;
}
case CACHE_EVENT_OPEN_WRITE_FAILED:
default:
{
SET_HANDLER((CacheContHandler) & CacheContinuation::replyOpEvent);
return handleEvent(CACHE_EVENT_OPEN_READ_FAILED, 0);
break;
}
} // end of switch
return EVENT_DONE;
}
/////////////////////////////////////////////////////////////////////////
// replyOpEvent()
// Reflect the (local) reply back to the (remote) requesting node.
/////////////////////////////////////////////////////////////////////////
int
CacheContinuation::replyOpEvent(int event, VConnection * cvc)
{
ink_assert(magicno == (int) MagicNo);
Debug("cache_proto", "replyOpEvent(this=%p,event=%d,VC=%p)", this, event, cvc);
ink_hrtime now;
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
ink_release_assert(expect_cache_callback);
expect_cache_callback = false; // make sure we are called back exactly once
result = event;
bool open = event_is_open(event);
bool read_op = op_is_read(request_opcode);
bool open_read_now_open_write = false;
// Reply message initializations
CacheOpReplyMsg rmsg;
CacheOpReplyMsg *msg = &rmsg;
msg->result = event;
if ((request_opcode == CACHE_OPEN_READ_LONG)
&& cvc && (event == CACHE_EVENT_OPEN_WRITE)) {
//////////////////////////////////////////////////////////////////////////
// open read failed, but open write succeeded, set result to
// CACHE_EVENT_OPEN_READ_FAILED and make result token non zero to
// signal to the remote node that we have established a write connection.
//////////////////////////////////////////////////////////////////////////
msg->result = CACHE_EVENT_OPEN_READ_FAILED;
open_read_now_open_write = true;
}
msg->seq_number = seq_number;
int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token
int len = 0;
int vers = 0;
int results_expected = 1;
if (no_reply_message) // CACHE_NO_RESPONSE request
goto free_exit;
if (open) {
// prepare for CACHE_OPEN_EVENT
results_expected = 2;
cache_vc = cvc;
cache_read = (event == CACHE_EVENT_OPEN_READ);
if (read_op && !open_read_now_open_write) {
ink_release_assert(write_cluster_vc->pending_remote_fill);
ink_assert(have_all_data || (readahead_vio == &((CacheVC *) cache_vc)->vio));
Debug("cache_proto", "connect_local success seqno=%d have_all_data=%d", seq_number, (have_all_data ? 1 : 0));
if (have_all_data) {
msg->token.clear(); // Tell sender no conn established
write_cluster_vc->type = VC_CLUSTER_WRITE;
} else {
msg->token = token; // Tell sender conn established
setupReadBufTunnel(cache_vc, write_cluster_vc);
}
} else {
Debug("cache_proto", "cache_open [%s] success seqno=%d", (cache_read ? "R" : "W"), seq_number);
msg->token = token; // Tell sender conn established
OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
pOWT->init(read_cluster_vc, cache_vc, NULL, nbytes ? nbytes : DEFAULT_MAX_BUFFER_SIZE, this->mutex);
read_cluster_vc->allow_remote_close();
results_expected--;
}
// For cache reads, marshal the associated CacheHTTPInfo in the reply
if (cache_read) {
int res;
msg->is_ram_cache_hit = ((CacheVC *)cache_vc)->is_ram_cache_hit();
if (!cache_vc_info.valid()) {
(void) getObjectSize(cache_vc, request_opcode, &cache_vc_info);
}
// Determine data length and allocate
len = cache_vc_info.marshal_length();
CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
// Initialize reply message header
*reply = *msg;
// Marshal response data into reply message
res = cache_vc_info.marshal((char *) reply + flen, len);
ink_assert(res >= 0 && res <= len);
// Make reply message the current message
msg = reply;
}
} else {
Debug("cache_proto", "cache operation failed result=%d seqno=%d (this=%p)", event, seq_number, this);
msg->token.clear(); // Tell sender no conn established
// Reallocate reply message, allowing for marshalled data
len += sizeof(int32_t);
CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
// Initialize reply message header
*reply = *msg;
if (request_opcode != CACHE_LINK) {
//
// open read/write failed, close preallocated VC
//
if (read_cluster_vc) {
read_cluster_vc->remote_closed = 1; // avoid remote close msg
read_cluster_vc->do_io(VIO::CLOSE);
}
if (write_cluster_vc) {
write_cluster_vc->pending_remote_fill = 0;
write_cluster_vc->remote_closed = 1; // avoid remote close msg
write_cluster_vc->do_io(VIO::CLOSE);
}
reply->moi.u32 = (int32_t) ((uintptr_t) cvc & 0xffffffff); // code describing failure
}
// Make reply message the current message
msg = reply;
}
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
//
// Send reply message
//
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg(msg->seq_number, 0, "replyOpEvent");
#endif
vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
if (read_op) {
// Transmit reply message and object data in same cluster message
Debug("cache_proto", "Sending reply/data seqno=%d buflen=%" PRId64,
seq_number, readahead_data ? bytes_IOBufferBlockList(readahead_data, 1) : 0);
clusterProcessor.invoke_remote_data(ch,
CACHE_OP_RESULT_CLUSTER_FUNCTION,
(void *) msg, (flen + len),
readahead_data,
cluster_vc_channel, &token,
&CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
} else {
Debug("cache_proto", "Sending reply seqno=%d, (this=%p)", seq_number, this);
clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION,
(void *) msg, (flen + len), CLUSTER_OPT_STEAL);
}
} else {
//////////////////////////////////////////////////////////////
// Create the specified down rev version of this message
//////////////////////////////////////////////////////////////
ink_release_assert(!"replyOpEvent() bad msg version");
}
free_exit:
results_expected--;
if (results_expected <= 0) {
Debug("cache_proto", "replyOpEvent: freeing this=%p", this);
cacheContAllocator_free(this);
}
return EVENT_DONE;
}
void
CacheContinuation::setupReadBufTunnel(VConnection * cache_read_vc, VConnection * cluster_write_vc)
{
////////////////////////////////////////////////////////////
// Setup OneWayTunnel and tunnel close event handler.
// Used in readahead processing on open read connections.
////////////////////////////////////////////////////////////
tunnel_cont = cacheContAllocator_alloc();
tunnel_cont->mutex = this->mutex;
SET_CONTINUATION_HANDLER(tunnel_cont, (CacheContHandler)
& CacheContinuation::tunnelClosedEvent);
int64_t ravail = bytes_IOBufferBlockList(readahead_data, 1);
tunnel_mutex = tunnel_cont->mutex;
tunnel_closed = false;
tunnel = OneWayTunnel::OneWayTunnel_alloc();
readahead_reader->consume(ravail); // allow for bytes sent in initial reply
tunnel->init(cache_read_vc, cluster_write_vc, tunnel_cont, readahead_vio, readahead_reader);
tunnel_cont->action = this;
tunnel_cont->tunnel = tunnel;
tunnel_cont->tunnel_cont = tunnel_cont;
// Disable cluster_write_vc
((ClusterVConnection *) cluster_write_vc)->write.enabled = 0;
// Disable cache read VC
readahead_vio->nbytes = readahead_vio->ndone;
/////////////////////////////////////////////////////////////////////
// At this point, the OneWayTunnel is blocked awaiting a reenable
// on both the source and target VCs. Reenable occurs after the
// message containing the initial data and open read reply are sent.
/////////////////////////////////////////////////////////////////////
}
///////////////////////////////////////////////////////////////////////
// Tunnnel exited event handler, used for readahead on open read.
///////////////////////////////////////////////////////////////////////
int
CacheContinuation::tunnelClosedEvent(int /* event ATS_UNUSED */, void *c)
{
ink_assert(magicno == (int) MagicNo);
// Note: We are called with the tunnel_mutex held.
CacheContinuation *tc = (CacheContinuation *) c;
ink_release_assert(tc->tunnel_cont == tc);
CacheContinuation *real_cc = (CacheContinuation *) tc->action.continuation;
if (real_cc) {
// Notify the real continuation of the tunnel closed event
real_cc->tunnel = 0;
real_cc->tunnel_cont = 0;
real_cc->tunnel_closed = true;
}
OneWayTunnel::OneWayTunnel_free(tc->tunnel);
cacheContAllocator_free(tc);
return EVENT_DONE;
}
////////////////////////////////////////////////////////////
// Retry DisposeOfDataBuffer continuation
////////////////////////////////////////////////////////////
struct retryDisposeOfDataBuffer;
typedef int (retryDisposeOfDataBuffer::*rtryDisOfDBufHandler) (int, void *);
struct retryDisposeOfDataBuffer:public Continuation
{
CacheContinuation *c;
int handleRetryEvent(int event, Event * e)
{
if (CacheContinuation::handleDisposeEvent(event, c) == EVENT_DONE) {
delete this;
return EVENT_DONE;
} else
{
e->schedule_in(HRTIME_MSECONDS(10));
return EVENT_CONT;
}
}
retryDisposeOfDataBuffer(CacheContinuation * cont)
: Continuation(new_ProxyMutex()), c(cont) {
SET_HANDLER((rtryDisOfDBufHandler)
& retryDisposeOfDataBuffer::handleRetryEvent);
}
};
//////////////////////////////////////////////////////////////////
// Callback from cluster to dispose of data passed in
// call to invoke_remote_data().
//////////////////////////////////////////////////////////////////
void
CacheContinuation::disposeOfDataBuffer(void *d)
{
ink_assert(d);
CacheContinuation *cc = (CacheContinuation *) d;
ink_assert(cc->have_all_data || cc->readahead_vio);
ink_assert(cc->have_all_data || (cc->readahead_vio == &((CacheVC *) cc->cache_vc)->vio));
if (cc->have_all_data) {
//
// All object data resides in the buffer, no OneWayTunnel
// started and the Cache VConnection has already been closed.
// Close write_cluster_vc and set remote close to avoid send of
// close message to remote node.
//
cc->write_cluster_vc->pending_remote_fill = 0;
cc->write_cluster_vc->remote_closed = 1;
cc->write_cluster_vc->do_io(VIO::CLOSE);
cc->readahead_data = 0;
cacheContAllocator_free(cc);
} else {
cc->write_cluster_vc->pending_remote_fill = 0;
cc->write_cluster_vc->allow_remote_close();
if (handleDisposeEvent(0, cc) == EVENT_CONT) {
// Setup retry continuation.
retryDisposeOfDataBuffer *retryCont = NEW(new retryDisposeOfDataBuffer(cc));
eventProcessor.schedule_in(retryCont, HRTIME_MSECONDS(10), ET_CALL);
}
}
}
int
CacheContinuation::handleDisposeEvent(int /* event ATS_UNUSED */, CacheContinuation * cc)
{
ink_assert(cc->magicno == (int) MagicNo);
MUTEX_TRY_LOCK(lock, cc->tunnel_mutex, this_ethread());
if (lock) {
// Write of initial object data is complete.
if (!cc->tunnel_closed) {
// Start tunnel by reenabling source and target VCs.
cc->tunnel->vioSource->nbytes = getObjectSize(cc->tunnel->vioSource->vc_server, cc->request_opcode, 0);
cc->tunnel->vioSource->reenable_re();
// Tunnel may be closed by vioSource->reenable_re(),
// we should check it again here:
if (!cc->tunnel_closed) {
cc->tunnel->vioTarget->reenable();
// Tell tunnel event we are gone
cc->tunnel_cont->action.continuation = 0;
}
}
cacheContAllocator_free(cc);
return EVENT_DONE;
} else {
// Lock acquire failed, retry operation.
return EVENT_CONT;
}
}
/////////////////////////////////////////////////////////////////////////////
// cache_op_result_ClusterFunction()
// Invoked on the machine which initiated a remote op, this
// unmarshals the result and calls a continuation in the requesting thread.
/////////////////////////////////////////////////////////////////////////////
void
cache_op_result_ClusterFunction(ClusterHandler *ch, void *d, int l)
{
////////////////////////////////////////////////////////
// Note: we are running on the ET_CACHE_CONT_SM thread
////////////////////////////////////////////////////////
// Copy reply message data
Ptr<IOBufferData> iob = make_ptr(new_IOBufferData(iobuffer_size_to_index(l)));
memcpy(iob->data(), (char *) d, l);
char *data = iob->data();
int flen, len = l;
CacheHTTPInfo ci;
CacheOpReplyMsg *msg = (CacheOpReplyMsg *) data;
int32_t op_result_error = 0;
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
if (mh->GetMsgVersion() != CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) { ////////////////////////////////////////////////
// Convert from old to current message format
////////////////////////////////////////////////
ink_release_assert(!"cache_op_result_ClusterFunction() bad msg version");
}
flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
if (mh->NeedByteSwap())
msg->SwapBytes();
Debug("cluster_cache", "received cache op result, seqno=%d result=%d", msg->seq_number, msg->result);
// If applicable, unmarshal any response data
if ((len > flen) && event_reply_may_have_moi(msg->result)) {
switch (msg->result) {
case CACHE_EVENT_OPEN_READ:
{
char *p = (char *) msg + flen;
int res;
// Unmarshal CacheHTTPInfo
res = HTTPInfo::unmarshal(p, len, NULL);
ci.get_handle(p, len);
ink_assert(res > 0);
ink_assert(ci.valid());
break;
}
case CACHE_EVENT_LINK:
case CACHE_EVENT_LINK_FAILED:
break;
case CACHE_EVENT_OPEN_READ_FAILED:
case CACHE_EVENT_OPEN_WRITE_FAILED:
case CACHE_EVENT_REMOVE_FAILED:
case CACHE_EVENT_UPDATE_FAILED:
case CACHE_EVENT_DEREF_FAILED:
{
// Unmarshal the error code
ink_assert(((len - flen) == sizeof(int32_t)));
op_result_error = msg->moi.u32;
if (mh->NeedByteSwap())
ats_swap32((uint32_t *) & op_result_error);
op_result_error = -op_result_error;
break;
}
default:
{
ink_release_assert(!"invalid moi data for received msg");
break;
}
} // end of switch
}
// See if this response is still expected (expected case == yes)
unsigned int hash = FOLDHASH(ch->machine->ip, msg->seq_number);
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
if (MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], thread)) {
// Find it in pending list
CacheContinuation *c = find_cache_continuation(msg->seq_number,
ch->machine->ip);
if (!c) {
// Reply took to long, response no longer expected.
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
Debug("cluster_timeout", "0cache reply timeout: %d", msg->seq_number);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
if (ci.valid())
ci.destroy();
return;
}
// Update remote ram cache hit flag
if (msg->result == CACHE_EVENT_OPEN_READ)
c->read_cluster_vc->set_ram_cache_hit(msg->is_ram_cache_hit);
// Try to send the message
MUTEX_TRY_LOCK(lock, c->mutex, thread);
// Failed to acquire lock, defer
if (!lock) {
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], thread);
goto Lretry;
}
c->result_error = op_result_error;
// send message, release lock
c->freeMsgBuffer();
if (ci.valid()) {
// Unmarshaled CacheHTTPInfo contained in reply message, copy it.
c->setMsgBufferLen(len, iob);
c->ic_new_info = ci;
}
msg->seq_number = len; // HACK ALERT: reusing variable
c->handleEvent(CACHE_EVENT_RESPONSE_MSG, data);
} else {
// Failed to wake it up, defer by creating a timed continuation
Lretry:
CacheContinuation * c = CacheContinuation::cacheContAllocator_alloc();
c->mutex = new_ProxyMutex();
c->seq_number = msg->seq_number;
c->target_ip = ch->machine->ip;
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::handleReplyEvent);
c->start_time = ink_get_hrtime();
c->result = msg->result;
if (event_is_open(msg->result))
c->token = msg->token;
if (ci.valid()) {
// Unmarshaled CacheHTTPInfo contained in reply message, copy it.
c->setMsgBufferLen(len, iob);
c->ic_new_info = ci;
}
c->result_error = op_result_error;
eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
}
}
////////////////////////////////////////////////////////////////////////
// handleReplyEvent()
// If we cannot acquire any of the locks to handle the response
// inline, it is defered and later handled by this function.
////////////////////////////////////////////////////////////////////////
int
CacheContinuation::handleReplyEvent(int event, Event * e)
{
(void) event;
// take lock on outstanding message queue
EThread *t = e->ethread;
unsigned int hash = FOLDHASH(target_ip, seq_number);
if (!MUTEX_TAKE_TRY_LOCK(remoteCacheContQueueMutex[hash], t)) {
e->schedule_in(CACHE_RETRY_PERIOD);
return EVENT_CONT;
}
LOG_EVENT_TIME(start_time, cntlck_acquire_time_dist, cntlck_acquire_events);
// See if this response is still expected
CacheContinuation *c = find_cache_continuation(seq_number, target_ip);
if (c) {
// Acquire the lock to the continuation mutex
MUTEX_TRY_LOCK(lock, c->mutex, e->ethread);
if (!lock) {
// If we fail to acquire the lock, reschedule
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
e->schedule_in(CACHE_RETRY_PERIOD);
return EVENT_CONT;
}
// If unmarshalled CacheHTTPInfo exists, pass it along
if (ic_new_info.valid()) {
c->freeMsgBuffer();
c->setMsgBufferLen(getMsgBufferLen(), getMsgBufferIOBData());
c->ic_new_info = ic_new_info;
ic_new_info.clear();
}
// send message, release lock
c->handleEvent(CACHE_EVENT_RESPONSE, this);
} else {
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], t);
Debug("cluster_timeout", "cache reply timeout: %d", seq_number);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_REPLY_TIMEOUTS_STAT);
}
// Free this continuation
cacheContAllocator_free(this);
return EVENT_DONE;
}
//////////////////////////////////////////////////////////////////////////
// remoteOpEvent()
// On the requesting node, handle the timeout and response to the user.
// There may be two CacheContinuations involved:
// 1) One waiting to respond to the user.
// This case is CACHE_EVENT_RESPONSE_MSG which is handled
// inline (without delay).
// 2) One which is carrying the response from the remote machine which
// has been delayed for a lock. This case is CACHE_EVENT_RESPONSE.
//////////////////////////////////////////////////////////////////////////
int
CacheContinuation::remoteOpEvent(int event_code, Event * e)
{
ink_assert(magicno == (int) MagicNo);
int event = event_code;
ink_hrtime now;
if (start_time) {
int res;
if (event != EVENT_INTERVAL) {
if (event == CACHE_EVENT_RESPONSE) {
CacheContinuation *ccont = (CacheContinuation *) e;
res = ccont->result;
} else {
CacheOpReplyMsg *rmsg = (CacheOpReplyMsg *) e;
res = rmsg->result;
}
if ((res == CACHE_EVENT_LOOKUP) || (res == CACHE_EVENT_LOOKUP_FAILED)) {
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_LKRMT_CALLBACK_TIME_STAT, now - start_time);
LOG_EVENT_TIME(start_time, lkrmt_callback_time_dist, lkrmt_cache_callbacks);
} else {
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_RMT_CALLBACK_TIME_STAT, now - start_time);
LOG_EVENT_TIME(start_time, rmt_callback_time_dist, rmt_cache_callbacks);
}
}
start_time = 0;
}
// for CACHE_EVENT_RESPONSE/XXX the lock was acquired at the higher level
intptr_t return_error = 0;
ClusterVCToken *pToken = NULL;
retry:
switch (event) {
default:
ink_assert(!"bad case");
return EVENT_DONE;
case EVENT_INTERVAL:{
unsigned int hash = FOLDHASH(target_ip, seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], e->ethread);
if (!queuelock) {
e->schedule_in(CACHE_RETRY_PERIOD);
return EVENT_CONT;
}
// we are not yet enqueued on the list of outstanding operations
if (!remoteCacheContQueue[hash].in(this)) {
remoteCacheContQueue[hash].enqueue(this);
ink_assert(timeout == e);
MUTEX_RELEASE(queuelock);
e->schedule_in(cache_cluster_timeout);
return EVENT_CONT;
}
// a timeout has occurred
if (find_cache_continuation(seq_number, target_ip)) {
// Valid timeout
MUTEX_RELEASE(queuelock);
Debug("cluster_timeout", "cluster op timeout %d", seq_number);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
request_timeout = true;
timeout = 0;
//
// Post error completion now and defer deallocation of
// the continuation until we receive the reply or the
// target node goes down.
//
if (!action.cancelled)
action.continuation->handleEvent(result, (void *) -ECLUSTER_OP_TIMEOUT);
action.cancelled = 1;
if (target_machine->dead) {
event = CACHE_EVENT_RESPONSE_MSG;
goto retry;
} else {
timeout = e;
e->schedule_in(cache_cluster_timeout);
return EVENT_DONE;
}
} else {
// timeout not expected for continuation; log and ignore
MUTEX_RELEASE(queuelock);
Debug("cluster_timeout", "unknown cluster op timeout %d", seq_number);
Note("Unexpected CacheCont timeout, [%u.%u.%u.%u] seqno=%d", DOT_SEPARATED(target_ip), seq_number);
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_REMOTE_OP_TIMEOUTS_STAT);
return EVENT_DONE;
}
}
case CACHE_EVENT_RESPONSE:
case CACHE_EVENT_RESPONSE_MSG:{
// the response has arrived, cancel timeout
if (timeout) {
timeout->cancel();
timeout = 0;
}
// remove from the pending queue
unsigned int hash = FOLDHASH(target_ip, seq_number);
remoteCacheContQueue[hash].remove(this);
MUTEX_UNTAKE_LOCK(remoteCacheContQueueMutex[hash], this_ethread());
// Fall through
}
case CACHE_EVENT_RESPONSE_RETRY:{
// determine result code
CacheContinuation *c = (CacheContinuation *) e;
CacheOpReplyMsg *msg = (CacheOpReplyMsg *) e;
if (event == CACHE_EVENT_RESPONSE_MSG) {
result = (request_timeout ? result : msg->result);
pToken = (request_timeout ? &token : &msg->token);
} else if (event == CACHE_EVENT_RESPONSE) {
result = (request_timeout ? result : c->result);
pToken = &c->token;
} else if (event == CACHE_EVENT_RESPONSE_RETRY) {
pToken = &token;
} else {
ink_release_assert(!"remoteOpEvent bad event code");
}
// handle response
if (result == CACHE_EVENT_LOOKUP) {
callback_user(result, 0);
return EVENT_DONE;
} else if (event_is_open(result)) {
bool read_op = ((request_opcode == CACHE_OPEN_READ)
|| (request_opcode == CACHE_OPEN_READ_LONG));
if (read_op) {
ink_release_assert(read_cluster_vc->pending_remote_fill > 1);
read_cluster_vc->pending_remote_fill = 0;
have_all_data = pToken->is_clear(); // no conn implies all data
if (have_all_data) {
read_cluster_vc->have_all_data = 1;
} else {
read_cluster_vc->have_all_data = 0;
}
// Move CacheHTTPInfo reply data into VC
read_cluster_vc->marshal_buf = this->getMsgBufferIOBData();
read_cluster_vc->alternate = this->ic_new_info;
this->ic_new_info.clear();
ink_release_assert(read_cluster_vc->alternate.object_size_get());
if (!action.cancelled) {
ClusterVConnection *target_vc = read_cluster_vc;
callback_user(result, target_vc); // "this" is deallocated
target_vc->allow_remote_close();
} else {
read_cluster_vc->allow_remote_close();
read_cluster_vc->do_io(VIO::ABORT);
cacheContAllocator_free(this);
}
} else {
ink_assert(result == CACHE_EVENT_OPEN_WRITE);
ink_assert(!pToken->is_clear());
ClusterVConnection *result_vc = write_cluster_vc;
if (!action.cancelled) {
callback_user(result, result_vc);
result_vc->allow_remote_close();
} else {
result_vc->allow_remote_close();
result_vc->do_io(VIO::ABORT);
cacheContAllocator_free(this);
}
}
return EVENT_DONE;
}
break;
} // End of case
} // End of switch
// Handle failure cases
if (result == CACHE_EVENT_LOOKUP_FAILED) {
// check for local probes
ClusterMachine *m = cluster_machine_at_depth(cache_hash(url_md5));
// if the current configuration indicates that this
// machine is the master (or the owner machine has failed), go to
// the local machine. Also if PROBE_LOCAL_CACHE_LAST.
//
int len = getMsgBufferLen();
char *hostname = (len ? getMsgBuffer() : 0);
if (!m || PROBE_LOCAL_CACHE_LAST) {
SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
CacheKey key(url_md5);
Cache *call_cache = caches[frag_type];
call_cache->lookup(this, &key, frag_type, hostname, len);
return EVENT_DONE;
}
if (PROBE_LOCAL_CACHE_FIRST) {
callback_user(CACHE_EVENT_LOOKUP_FAILED, 0);
} else {
SET_HANDLER((CacheContHandler) & CacheContinuation::probeLookupEvent);
CacheKey key(url_md5);
Cache *call_cache = caches[frag_type];
call_cache->lookup(this, &key, frag_type, hostname, len);
}
return EVENT_DONE;
} else {
// Handle failure of all ops except for lookup
ClusterVConnection *cacheable_vc = 0;
if ((request_opcode == CACHE_OPEN_READ_LONG) && !pToken->is_clear()) {
ink_assert(read_cluster_vc && !write_cluster_vc);
//
// OPEN_READ_LONG has failed, but the remote node was able to
// establish an OPEN_WRITE_LONG connection.
// Convert the cluster read VC to a write VC and insert it
// into the global write VC cache. This will allow us to
// locally resolve the subsequent OPEN_WRITE_LONG request.
//
// Note: We do not allow remote close on this VC while
// it resides in cache
//
read_cluster_vc->set_type(CLUSTER_OPT_CONN_WRITE);
// FIX ME. ajitb 12/21/99
// Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
// Does not accept assignment of ((Continuation *) NULL)
{
Continuation *temp = NULL;
read_cluster_vc->action_ = temp;
}
if (!GlobalOpenWriteVCcache->insert(&url_md5, read_cluster_vc)) {
// Unable to insert VC into cache, try later
cacheable_vc = read_cluster_vc;
}
read_cluster_vc = 0;
}
if (read_cluster_vc) {
read_cluster_vc->remote_closed = 0; // send remote close
read_cluster_vc->allow_remote_close();
read_cluster_vc->do_io(VIO::ABORT);
read_cluster_vc = 0;
}
if (write_cluster_vc) {
write_cluster_vc->remote_closed = 0; // send remote close
write_cluster_vc->allow_remote_close();
write_cluster_vc->do_io(VIO::ABORT);
write_cluster_vc = 0;
}
if (!request_timeout) {
if (!return_error) {
return_error = result_error;
}
if (cacheable_vc) {
insert_cache_callback_user(cacheable_vc, result, (void *) return_error);
} else {
callback_user(result, (void *) return_error);
}
} else {
// callback already made at timeout, just free continuation
if (cacheable_vc) {
cacheable_vc->allow_remote_close();
cacheable_vc->do_io(VIO::CLOSE);
cacheable_vc = 0;
}
cacheContAllocator_free(this);
}
return EVENT_DONE;
}
}
//////////////////////////////////////////////////////////////////////////
// probeLookupEvent()
// After a local probe, return the response to the client and cleanup.
//////////////////////////////////////////////////////////////////////////
int
CacheContinuation::probeLookupEvent(int event, void * /* d ATS_UNUSED */)
{
ink_assert(magicno == (int) MagicNo);
callback_user(event, 0);
return EVENT_DONE;
}
///////////////////////////////////////////////////////////
// lookupEvent()
// Result of a local lookup for PROBE_LOCAL_CACHE_FIRST
///////////////////////////////////////////////////////////
int
CacheContinuation::lookupEvent(int /* event ATS_UNUSED */, void * /* d ATS_UNUSED */)
{
ink_release_assert(!"Invalid call CacheContinuation::lookupEvent");
return EVENT_DONE;
}
//////////////////////////////////////////////////////////////////////////
// do_remote_lookup()
// If the object is supposed to be on a remote machine, probe there.
// Returns: Non zero (Action *) if a probe was initiated
// Zero (Action *) if no probe
//////////////////////////////////////////////////////////////////////////
Action *
CacheContinuation::do_remote_lookup(Continuation * cont, CacheKey * key,
CacheContinuation * c, CacheFragType ft, char *hostname, int hostname_len)
{
int probe_depth = 0;
ClusterMachine *past_probes[CONFIGURATION_HISTORY_PROBE_DEPTH] = { 0 };
int mlen = op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP) + ((hostname && hostname_len) ? hostname_len : 0);
CacheLookupMsg *msg = (CacheLookupMsg *) ALLOCA_DOUBLE(mlen);
msg->init();
if (key) {
msg->url_md5 = *key;
} else {
ink_assert(c);
msg->url_md5 = c->url_md5;
}
ClusterMachine *m = NULL;
if (cache_migrate_on_demand) {
m = cluster_machine_at_depth(cache_hash(msg->url_md5),
c ? &c->probe_depth : &probe_depth, c ? c->past_probes : past_probes);
} else {
// If migrate-on-demand is off, do not probe beyond one level.
if (c && c->probe_depth)
return (Action *) 0;
m = cluster_machine_at_depth(cache_hash(msg->url_md5));
if (c)
c->probe_depth = 1;
}
if (!m)
return (Action *) 0;
ClusterHandler *ch = m->pop_ClusterHandler();
if (!ch)
return (Action *) 0;
// If we do not have a continuation, build one
if (!c) {
c = cacheContAllocator_alloc();
c->mutex = cont->mutex;
c->probe_depth = probe_depth;
memcpy(c->past_probes, past_probes, sizeof(past_probes));
}
c->ch = ch;
// Save hostname data in case we need to do a local lookup.
if (hostname && hostname_len) {
// Alloc buffer, copy hostname data and attach to continuation
c->setMsgBufferLen(hostname_len);
c->allocMsgBuffer();
memcpy(c->getMsgBuffer(), hostname, hostname_len);
}
c->url_md5 = msg->url_md5;
c->action.cancelled = false;
c->action = cont;
c->start_time = ink_get_hrtime();
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::remoteOpEvent);
c->result = CACHE_EVENT_LOOKUP_FAILED;
// set up sequence number so we can find this continuation
c->target_ip = m->ip;
c->seq_number = new_cache_sequence_number();
msg->seq_number = c->seq_number;
c->frag_type = ft;
msg->frag_type = ft;
// establish timeout for lookup
unsigned int hash = FOLDHASH(c->target_ip, c->seq_number);
MUTEX_TRY_LOCK(queuelock, remoteCacheContQueueMutex[hash], this_ethread());
if (!queuelock) {
// failed to acquire lock: no problem, retry later
c->timeout = eventProcessor.schedule_in(c, CACHE_RETRY_PERIOD, ET_CACHE_CONT_SM);
} else {
remoteCacheContQueue[hash].enqueue(c);
MUTEX_RELEASE(queuelock);
c->timeout = eventProcessor.schedule_in(c, cache_cluster_timeout, ET_CACHE_CONT_SM);
}
char *data;
int len;
int vers = CacheLookupMsg::protoToVersion(m->msg_proto_major);
if (vers == CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) {
msg->seq_number = c->seq_number;
data = (char *) msg;
len = mlen;
if (hostname && hostname_len) {
memcpy(msg->moi.byte, hostname, hostname_len);
}
} else {
//////////////////////////////////////////////////////////////
// Create the specified down rev version of this message
//////////////////////////////////////////////////////////////
ink_release_assert(!"CacheLookupMsg bad msg version");
}
// send the message
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg(msg.seq_number, 0, "cache_lookup");
#endif
clusterProcessor.invoke_remote(c->ch, CACHE_LOOKUP_CLUSTER_FUNCTION, data, len);
return &c->action;
}
////////////////////////////////////////////////////////////////////////////
// cache_lookup_ClusterFunction()
// This function is invoked on a remote machine to do a remote lookup.
// It unmarshals the URL and does a local lookup, with its own
// continuation set to CacheContinuation::replyLookupEvent()
////////////////////////////////////////////////////////////////////////////
void
cache_lookup_ClusterFunction(ClusterHandler *ch, void *data, int len)
{
(void) len;
EThread *thread = this_ethread();
ProxyMutex *mutex = thread->mutex;
////////////////////////////////////////////////////////
// Note: we are running on the ET_CLUSTER thread
////////////////////////////////////////////////////////
CacheLookupMsg *msg = (CacheLookupMsg *) data;
ClusterMessageHeader *mh = (ClusterMessageHeader *) data;
if (mh->GetMsgVersion() != CacheLookupMsg::CACHE_LOOKUP_MESSAGE_VERSION) { ////////////////////////////////////////////////
// Convert from old to current message format
////////////////////////////////////////////////
ink_release_assert(!"cache_lookup_ClusterFunction() bad msg version");
}
if (mh->NeedByteSwap())
msg->SwapBytes();
CLUSTER_INCREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
CacheContinuation *c = CacheContinuation::cacheContAllocator_alloc();
c->mutex = new_ProxyMutex();
MUTEX_TRY_LOCK(lock, c->mutex, this_ethread());
c->no_reply_message = (msg->seq_number == CACHE_NO_RESPONSE);
c->seq_number = msg->seq_number;
c->from = ch->machine;
c->url_md5 = msg->url_md5;
SET_CONTINUATION_HANDLER(c, (CacheContHandler)
& CacheContinuation::replyLookupEvent);
CacheKey key(msg->url_md5);
#ifdef CACHE_MSG_TRACE
log_cache_op_msg(msg->seq_number, 0, "cache_lookup");
#endif
// Extract hostname data if passed.
char *hostname;
int hostname_len = len - op_to_sizeof_fixedlen_msg(CACHE_LOOKUP_OP);
hostname = (hostname_len ? (char *) msg->moi.byte : 0);
// Note: Hostname data invalid after return from lookup
Cache *call_cache = caches[msg->frag_type];
call_cache->lookup(c, &key, (CacheFragType) msg->frag_type, hostname, hostname_len);
}
/////////////////////////////////////////////////////////////////////////
// replyLookupEvent()
// This function handles the result of a lookup on a remote machine.
// It packages up the result and sends it back to the calling machine.
/////////////////////////////////////////////////////////////////////////
int
CacheContinuation::replyLookupEvent(int event, void * /* d ATS_UNUSED */)
{
ink_hrtime now;
now = ink_get_hrtime();
CLUSTER_SUM_DYN_STAT(CLUSTER_CACHE_CALLBACK_TIME_STAT, now - start_time);
LOG_EVENT_TIME(start_time, callback_time_dist, cache_callbacks);
int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
CacheOpReplyMsg *msg;
int flen = CacheOpReplyMsg::sizeof_fixedlen_msg();
msg = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen);
msg->init();
CLUSTER_DECREMENT_DYN_STAT(CLUSTER_CACHE_OUTSTANDING_STAT);
int len = flen - sizeof(msg->token);
if (!no_reply_message) {
msg->seq_number = seq_number;
msg->result = event;
#ifdef CACHE_MSG_TRACE
log_cache_op_sndmsg(seq_number, event, "cache_result");
#endif
clusterProcessor.invoke_remote(ch, CACHE_OP_RESULT_CLUSTER_FUNCTION, msg, len);
}
} else {
//////////////////////////////////////////////////////////////
// Create the specified down rev version of this message
//////////////////////////////////////////////////////////////
ink_release_assert(!"replyLookupEvent() bad msg version");
}
// Free up everything
cacheContAllocator_free(this);
return EVENT_DONE;
}
int32_t CacheContinuation::getObjectSize(VConnection * vc, int opcode, CacheHTTPInfo * ret_ci)
{
CacheHTTPInfo *ci = 0;
int64_t object_size = 0;
if ((opcode == CACHE_OPEN_READ_LONG)
|| (opcode == CACHE_OPEN_READ_BUFFER_LONG)) {
((CacheVC *) vc)->get_http_info(&ci);
if (ci) {
object_size = ci->object_size_get();
} else {
ci = 0;
object_size = 0;
}
} else {
object_size = ((CacheVC *)vc)->get_object_size();
}
if (ret_ci && !ret_ci->valid()) {
CacheHTTPInfo
new_ci;
new_ci.create();
if (ci) {
// Initialize copy
new_ci.copy(ci);
} else {
new_ci.object_size_set(object_size);
}
new_ci.m_alt->m_writeable = 1;
ret_ci->copy_shallow(&new_ci);
}
ink_release_assert(object_size);
return object_size;
}
//////////////////////////////////////////////////////////////////////////
// insert_cache_callback_user()
// Insert write VC into global cache prior to performing user callback.
//////////////////////////////////////////////////////////////////////////
void
CacheContinuation::insert_cache_callback_user(ClusterVConnection * vc, int res, void *e)
{
if (GlobalOpenWriteVCcache->insert(&url_md5, vc)) {
// Inserted
callback_user(res, e);
} else {
// Unable to insert, try later
result = res;
callback_data = e;
callback_data_2 = (void *) vc;
SET_HANDLER((CacheContHandler) & CacheContinuation::insertCallbackEvent);
eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
}
}
int
CacheContinuation::insertCallbackEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
if (GlobalOpenWriteVCcache->insert(&url_md5, (ClusterVConnection *)
callback_data_2)) {
// Inserted
callback_user(result, callback_data);
} else {
// Unable to insert, try later
eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
}
return EVENT_DONE;
}
///////////////////////////////////////////////////////////////////
// callback_user()
// Invoke handleEvent on the given continuation (cont) with
// considerations for Action.
///////////////////////////////////////////////////////////////////
void
CacheContinuation::callback_user(int res, void *e)
{
EThread *et = this_ethread();
if (!is_ClusterThread(et)) {
MUTEX_TRY_LOCK(lock, mutex, et);
if (lock) {
if (!action.cancelled) {
action.continuation->handleEvent(res, e);
}
cacheContAllocator_free(this);
} else {
// Unable to acquire lock, retry later
defer_callback_result(res, e);
}
} else {
// Can not post completion on ET_CLUSTER thread.
defer_callback_result(res, e);
}
}
void
CacheContinuation::defer_callback_result(int r, void *e)
{
result = r;
callback_data = e;
SET_HANDLER((CacheContHandler) & CacheContinuation::callbackResultEvent);
eventProcessor.schedule_imm(this, ET_CACHE_CONT_SM);
}
int
CacheContinuation::callbackResultEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
if (!action.cancelled)
action.continuation->handleEvent(result, callback_data);
cacheContAllocator_free(this);
return EVENT_DONE;
}
//-----------------------------------------------------------------
// CacheContinuation static member functions
//-----------------------------------------------------------------
///////////////////////////////////////////////////////////////////////
// cacheContAllocator_alloc()
///////////////////////////////////////////////////////////////////////
CacheContinuation *
CacheContinuation::cacheContAllocator_alloc()
{
return cacheContAllocator.alloc();
}
///////////////////////////////////////////////////////////////////////
// cacheContAllocator_free()
///////////////////////////////////////////////////////////////////////
void
CacheContinuation::cacheContAllocator_free(CacheContinuation * c)
{
ink_assert(c->magicno == (int) MagicNo);
// ink_assert(!c->cache_op_ClusterFunction);
c->magicno = -1;
#ifdef ENABLE_TIME_TRACE
c->start_time = 0;
#endif
c->free();
c->mutex = NULL;
// FIX ME. ajitb 12/21/99
// Compiler bug in CC: WorkShop Compilers 5.0 98/12/15 C++ 5.0
// Does not accept assignment of ((Continuation *) NULL)
{
Continuation *temp = NULL;
c->action = temp;
}
c->tunnel_mutex = NULL;
cacheContAllocator.free(c);
}
/////////////////////////////////////////////////////////////////////////
// callback_failure()
// Post error completion using a continuation.
/////////////////////////////////////////////////////////////////////////
Action *
CacheContinuation::callback_failure(Action * a, int result, int err, CacheContinuation * this_cc)
{
CacheContinuation *cc;
if (!this_cc) {
cc = cacheContAllocator_alloc();
cc->mutex = a->mutex;
cc->action = *a;
} else {
cc = this_cc;
}
cc->result = result;
cc->result_error = err;
SET_CONTINUATION_HANDLER(cc, (CacheContHandler)
& CacheContinuation::callbackEvent);
eventProcessor.schedule_imm(cc, ET_CACHE_CONT_SM);
return &cc->action;
}
///////////////////////////////////////////////////////////////////////
// callbackEvent()
// Invoke callback and deallocate continuation.
///////////////////////////////////////////////////////////////////////
int
CacheContinuation::callbackEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
if (!action.cancelled)
action.continuation->handleEvent(result, (void *)(intptr_t)result_error);
cacheContAllocator_free(this);
return EVENT_DONE;
}
//------------------------------------------------------------------
// File static functions
//------------------------------------------------------------------
////////////////////////////////////////////////////////////////////////
// find_cache_continuation()
// Find a currently pending cache continuation expecting a response.
// Requires taking the lock on the remoteCacheContQueueMutex first.
////////////////////////////////////////////////////////////////////////
static CacheContinuation *
find_cache_continuation(unsigned int seq_number, unsigned int from_ip)
{
unsigned int hash = FOLDHASH(from_ip, seq_number);
CacheContinuation *c = NULL;
CacheContinuation *lastc = NULL;
for (c = (CacheContinuation *) remoteCacheContQueue[hash].head; c; c = (CacheContinuation *) c->link.next) {
if (seq_number == c->seq_number && from_ip == c->target_ip) {
if (lastc) {
ink_release_assert(c->link.prev == lastc);
} else {
ink_release_assert(!c->link.prev);
}
break;
}
lastc = c;
}
return c;
}
/////////////////////////////////////////////////////////////////////////////
// new_cache_sequence_number()
// Generate unique request sequence numbers
/////////////////////////////////////////////////////////////////////////////
static unsigned int
new_cache_sequence_number()
{
unsigned int res = 0;
do {
res = (unsigned int) ink_atomic_increment(&cluster_sequence_number, 1);
} while (!res);
return res;
}
/***************************************************************************/
#ifdef OMIT
/***************************************************************************/
/////////////////////////////////////////////////////////////////////////////
// forwardEvent()
// for migrate-on-demand, make a connection between the
// the node which has the object and the node which should have it.
//
// prepared for either OPEN_READ (from current owner)
// or OPEN_WRITE (from new owner)
/////////////////////////////////////////////////////////////////////////////
int
CacheContinuation::forwardEvent(int event, VConnection * c)
{
int ret = EVENT_CONT;
cluster_vc = 0;
cache_read = false;
switch (event) {
default:
ink_assert(!"bad case");
case CACHE_EVENT_OPEN_WRITE_FAILED:
ret = EVENT_DONE;
break;
case CACHE_EVENT_OPEN_WRITE:
cluster_vc = c;
break;
case CACHE_EVENT_OPEN_READ_FAILED:
cache_read = true;
ret = EVENT_DONE;
break;
case CACHE_EVENT_OPEN_READ:
cache_read = true;
cluster_vc = c;
break;
}
SET_HANDLER((CacheContHandler) & CacheContinuation::forwardWaitEvent);
return ret;
}
////////////////////////////////////////////////////////////////////////
// forwardWaitEvent()
// For migrate-on-demand, make a connection as above (forwardEvent)
// second either OPEN_READ or OPEN_WRITE,
// the data for the first is stored in (cluster_vc,cache_read)
////////////////////////////////////////////////////////////////////////
int
CacheContinuation::forwardWaitEvent(int event, VConnection * c)
{
int ret = EVENT_CONT;
int res = CACHE_EVENT_OPEN_READ_FAILED;
void *res_data = NULL;
VConnection *vc = NULL;
switch (event) {
default:
ink_assert(!"bad case");
case CACHE_EVENT_OPEN_WRITE_FAILED:
case CACHE_EVENT_OPEN_READ_FAILED:
ret = EVENT_DONE;
break;
case CACHE_EVENT_OPEN_WRITE:
case CACHE_EVENT_OPEN_READ:
vc = c;
break;
}
VConnection *read_vc = (cache_read ? cluster_vc : vc);
VConnection *write_vc = (!cache_read ? cluster_vc : vc);
res = read_vc ? CACHE_EVENT_OPEN_READ : CACHE_EVENT_OPEN_READ_FAILED;
res_data = read_vc;
// if the read and write are sucessful, tunnel the read to the write
if (read_vc && write_vc) {
res_data = NEW(new VCTee(read_vc, write_vc, vio));
if (vio) { // CACHE_EVENT_OPEN_READ_VIO
res = event;
res_data = &((VCTee *) read_vc)->vio;
}
}
// if the read is sucessful return it to the user
//
c->handleEvent(res, res_data);
return ret;
}
/////////////////////////////////////////////////////////////////////
// tunnelEvent()
// If the reply requires data, tunnel the data from the cache
// to the cluster.
/////////////////////////////////////////////////////////////////////
int
CacheContinuation::tunnelEvent(int event, VConnection * vc)
{
int ret = EVENT_DONE;
int flen = CacheOpReplyMsg::sizeof_fixedlen_msg(); // include token
int len = 0;
bool read_buf = ((request_opcode == CACHE_OPEN_READ_BUFFER)
|| (request_opcode == CACHE_OPEN_READ_BUFFER_LONG));
ink_release_assert(!read_buf);
CacheOpReplyMsg rmsg;
CacheOpReplyMsg *msg = &rmsg;
msg->result = result;
msg->seq_number = seq_number;
msg->token = token;
int expect_reply = 1;
if (event == CLUSTER_EVENT_OPEN) {
if (cache_read) {
if (read_buf) {
ink_assert(have_all_data || (readahead_vio == &((CacheVConnection *) cluster_vc)->vio));
write_cluster_vc = (ClusterVConnection *) vc;
if (have_all_data) {
msg->token.clear(); // Tell sender no conn established
} else {
msg->token = token; // Tell sender conn established
setupReadBufTunnel(cluster_vc, vc);
}
} else {
OneWayTunnel *pOWT = OneWayTunnel::OneWayTunnel_alloc();
pOWT->init(cluster_vc, vc, NULL, nbytes, this->mutex);
--expect_reply;
}
////////////////////////////////////////////////////////
// cache_read requires CacheHTTPInfo in reply message.
////////////////////////////////////////////////////////
int res;
CacheHTTPInfo *ci;
if (!cache_vc_info) {
// OPEN_READ case
(void) getObjectSize(cluster_vc, request_opcode, &cache_vc_info);
}
ci = cache_vc_info;
// Determine data length and allocate
len = ci->marshal_length();
CacheOpReplyMsg *reply = (CacheOpReplyMsg *) ALLOCA_DOUBLE(flen + len);
// Initialize reply message header
*reply = *msg;
// Marshal response data into reply message
res = ci->marshal((char *) reply->moi.byte, len);
ink_assert(res > 0);
// Make reply message the current message
msg = reply;
} else {
OneWayTunnel *pOWT = OneWayTunnelAllocator.alloc();
pOWT->init(vc, cluster_vc, NULL, nbytes, this->mutex);
--expect_reply;
}
ret = EVENT_CONT;
} else {
ink_release_assert(event == CLUSTER_EVENT_OPEN_FAILED);
msg->result = CACHE_EVENT_SET_FAILED(result);
if (read_buf) {
Debug("cluster_timeout", "unable to make cluster connection2");
initial_buf = 0; // Do not send data
initial_bufsize = 0;
if (!have_all_data) {
// Shutdown cache connection and free MIOBuffer
MIOBuffer *mbuf = readahead_vio->buffer.writer();
cluster_vc->do_io(VIO::CLOSE);
free_MIOBuffer(mbuf);
}
} else {
Debug("cluster_timeout", "unable to make cluster connection2A");
cluster_vc->do_io(VIO::CLOSE);
}
len = 0 - (int) sizeof(msg->token);
--expect_reply;
}
int vers = CacheOpReplyMsg::protoToVersion(from->msg_proto_major);
if (vers == CacheOpReplyMsg::CACHE_OP_REPLY_MESSAGE_VERSION) {
if (read_buf) {
// Transmit reply message and object data in same cluster message
clusterProcessor.invoke_remote_data(from,
CACHE_OP_RESULT_CLUSTER_FUNCTION,
(void *) msg, (flen + len),
initial_buf, initial_bufsize,
cluster_vc_channel, &token,
&CacheContinuation::disposeOfDataBuffer, (void *) this, CLUSTER_OPT_STEAL);
} else {
clusterProcessor.invoke_remote(from, CACHE_OP_RESULT_CLUSTER_FUNCTION,
(void *) msg, (flen + len), CLUSTER_OPT_STEAL);
}
} else {
//////////////////////////////////////////////////////////////
// Create the specified down rev version of this message
//////////////////////////////////////////////////////////////
ink_release_assert(!"tunnelEvent() bad msg version");
}
if (expect_reply <= 0)
cacheContAllocator_free(this);
return ret;
}
/////////////////////////////////////////////////////////////////////
// remoteConnectEvent()
// If this was an open, make a connection on this side before
// responding to the user.
/////////////////////////////////////////////////////////////////////
int
CacheContinuation::remoteConnectEvent(int event, VConnection * cvc)
{
ClusterVConnection *vc = (ClusterVConnection *) cvc;
if (event == CLUSTER_EVENT_OPEN) {
if (result == CACHE_EVENT_OPEN_READ) {
// Move CacheHTTPInfo reply data into VC
vc->alternate = this->ic_new_info;
this->ic_new_info.clear();
}
callback_user(result, vc);
return EVENT_CONT;
} else {
Debug("cluster_cache", "unable to make cluster connection");
callback_user(CACHE_EVENT_SET_FAILED(result), vc);
return EVENT_DONE;
}
}
/***************************************************************************/
#endif // OMIT
/***************************************************************************/
// End of ClusterCache.cc