| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "tsmemcache.h" |
| #include "I_NetVConnection.h" |
| #include "I_NetProcessor.h" |
| |
| /* |
| TODO |
| - on OPEN_WRITE_FAIL don't poll, figure out another way, and timeout |
| - factor code better, particularly incr/set |
| - MIOBufferAccessor::reader_for |
| - cleanup creader dependency in stream_event |
| */ |
| |
| #define REALTIME_MAXDELTA 60 * 60 * 24 * 30 |
| #define STRCMP_REST(_c, _s, _e) (((_e) - (_s)) < (int)sizeof(_c) || STRCMP(_s, _c) || !isspace((_s)[sizeof(_c) - 1])) |
| |
| ClassAllocator<MC> theMCAllocator("MC"); |
| |
| static time_t base_day_time; |
| |
| // These should be persistent. |
| int32_t MC::verbosity = 0; |
| ink_hrtime MC::last_flush = 0; |
| int64_t MC::next_cas = 1; |
| |
| static void |
| tsmemcache_constants() |
| { |
| struct tm tm; |
| memset(&tm, 0, sizeof(tm)); |
| // jan 1 2010 |
| tm.tm_year = 110; |
| tm.tm_mon = 1; |
| tm.tm_mday = 1; |
| base_day_time = mktime(&tm); |
| ink_assert(base_day_time != (time_t)-1); |
| } |
| |
| #ifdef DEBUG |
| char debug_string_buffer[TSMEMCACHE_TMP_CMD_BUFFER_SIZE]; |
| static char * |
| mc_string(const char *s, int len) |
| { |
| int l = len; |
| while (l && (s[l - 1] == '\r' || s[l - 1] == '\n')) { |
| l--; |
| } |
| if (l > TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1) { |
| l = TSMEMCACHE_TMP_CMD_BUFFER_SIZE - 1; |
| } |
| if (l) { |
| memcpy(debug_string_buffer, s, l); |
| } |
| debug_string_buffer[l] = 0; |
| return debug_string_buffer; |
| } |
| #endif |
| |
| #ifdef DEBUG |
| #define MCDebugBuf(_t, _s, _l) \ |
| if (is_debug_tag_set(_t)) \ |
| printf(_t ": %s\n", mc_string(_s, _l)) |
| #define MCDebug Debug |
| #else |
| #define MCDebugBuf(_t, _s, _l) \ |
| do { \ |
| } while (0) |
| #define MCDebug \ |
| if (0) \ |
| Debug |
| #endif |
| |
| static uint64_t |
| ink_hton64(uint64_t in) |
| { |
| int32_t val = 1; |
| uint8_t *c = reinterpret_cast<uint8_t *>(&val); |
| if (*c == 1) { |
| union { |
| uint64_t rv; |
| uint8_t b[8]; |
| } x; |
| #define SWP1B(_x, _y) \ |
| do { \ |
| uint8_t t = (_y); \ |
| (_y) = (_x); \ |
| (_x) = t; \ |
| } while (0) |
| x.rv = in; |
| SWP1B(x.b[0], x.b[7]); |
| SWP1B(x.b[1], x.b[6]); |
| SWP1B(x.b[2], x.b[5]); |
| SWP1B(x.b[3], x.b[4]); |
| #undef SWP1B |
| return x.rv; |
| } else { |
| return in; |
| } |
| } |
| #define ink_ntoh64 ink_hton64 |
| |
| int |
| MCAccept::main_event(int event, void *data) |
| { |
| if (event == NET_EVENT_ACCEPT) { |
| NetVConnection *netvc = (NetVConnection *)data; |
| MC *mc = theMCAllocator.alloc(); |
| if (!mutex->thread_holding) { |
| mc->new_connection(netvc, netvc->thread); |
| } else { |
| mc->new_connection(netvc, mutex->thread_holding); |
| } |
| return EVENT_CONT; |
| } else { |
| Fatal("tsmemcache accept received fatal error: errno = %d", -(static_cast<int>((intptr_t)data))); |
| return EVENT_CONT; |
| } |
| } |
| |
| void |
| MC::new_connection(NetVConnection *netvc, EThread *thread) |
| { |
| nvc = netvc; |
| mutex = new_ProxyMutex(); |
| rbuf = new_MIOBuffer(MAX_IOBUFFER_SIZE); |
| rbuf->water_mark = TSMEMCACHE_TMP_CMD_BUFFER_SIZE; |
| reader = rbuf->alloc_reader(); |
| wbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| cbuf = 0; |
| writer = wbuf->alloc_reader(); |
| SCOPED_MUTEX_LOCK(lock, mutex, thread); |
| rvio = nvc->do_io_read(this, INT64_MAX, rbuf); |
| wvio = nvc->do_io_write(this, 0, writer); |
| header.magic = TSMEMCACHE_HEADER_MAGIC; |
| read_from_client(); |
| } |
| |
| int |
| MC::die() |
| { |
| if (pending_action && pending_action != ACTION_RESULT_DONE) { |
| pending_action->cancel(); |
| } |
| if (nvc) { |
| nvc->do_io_close(1); // abort |
| } |
| if (crvc) { |
| crvc->do_io_close(1); // abort |
| } |
| if (cwvc) { |
| cwvc->do_io_close(1); // abort |
| } |
| if (rbuf) { |
| free_MIOBuffer(rbuf); |
| } |
| if (wbuf) { |
| free_MIOBuffer(wbuf); |
| } |
| if (cbuf) { |
| free_MIOBuffer(cbuf); |
| } |
| |
| ats_free(tbuf); |
| |
| mutex = NULL; |
| theMCAllocator.free(this); |
| return EVENT_DONE; |
| } |
| |
| int |
| MC::unexpected_event() |
| { |
| ink_assert(!"unexpected event"); |
| return die(); |
| } |
| |
| int |
| MC::write_then_close(int64_t ntowrite) |
| { |
| SET_HANDLER(&MC::write_then_close_event); |
| return write_to_client(ntowrite); |
| } |
| |
| int |
| MC::write_then_read_from_client(int64_t ntowrite) |
| { |
| SET_HANDLER(&MC::read_from_client_event); |
| return write_to_client(ntowrite); |
| } |
| |
| int |
| MC::stream_then_read_from_client(int64_t ntowrite) |
| { |
| SET_HANDLER(&MC::read_from_client_event); |
| creader = reader; |
| TS_PUSH_HANDLER(&MC::stream_event); |
| return write_to_client(ntowrite); |
| } |
| |
| void |
| MC::add_binary_header(uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) |
| { |
| protocol_binary_response_header r; |
| |
| r.response.magic = static_cast<uint8_t>(PROTOCOL_BINARY_RES); |
| r.response.opcode = binary_header.request.opcode; |
| r.response.keylen = static_cast<uint16_t>(htons(key_len)); |
| r.response.extlen = hdr_len; |
| r.response.datatype = static_cast<uint8_t>(PROTOCOL_BINARY_RAW_BYTES); |
| r.response.status = static_cast<uint16_t>(htons(err)); |
| r.response.bodylen = htonl(body_len); |
| r.response.opaque = binary_header.request.opaque; |
| r.response.cas = ink_hton64(header.cas); |
| |
| wbuf->write(&r, sizeof(r)); |
| } |
| |
| int |
| MC::write_binary_error(protocol_binary_response_status err, int swallow) |
| { |
| const char *errstr = "Unknown error"; |
| switch (err) { |
| case PROTOCOL_BINARY_RESPONSE_ENOMEM: |
| errstr = "Out of memory"; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND: |
| errstr = "Unknown command"; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT: |
| errstr = "Not found"; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_EINVAL: |
| errstr = "Invalid arguments"; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS: |
| errstr = "Data exists for key."; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_E2BIG: |
| errstr = "Too large."; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL: |
| errstr = "Non-numeric server-side value for incr or decr"; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_NOT_STORED: |
| errstr = "Not stored."; |
| break; |
| case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR: |
| errstr = "Auth failure."; |
| break; |
| default: |
| ink_assert(!"unhandled error"); |
| errstr = "UNHANDLED ERROR"; |
| Warning("tsmemcache: unhandled error: %d\n", err); |
| } |
| |
| size_t len = strlen(errstr); |
| add_binary_header(err, 0, 0, len); |
| if (swallow > 0) { |
| int64_t avail = reader->read_avail(); |
| if (avail >= swallow) { |
| reader->consume(swallow); |
| } else { |
| swallow_bytes = swallow - avail; |
| reader->consume(avail); |
| SET_HANDLER(&MC::swallow_then_read_event); |
| } |
| } |
| return 0; |
| } |
| |
| int |
| MC::swallow_then_read_event(int event, void *data) |
| { |
| rvio->nbytes = INT64_MAX; |
| int64_t avail = reader->read_avail(); |
| if (avail >= swallow_bytes) { |
| reader->consume(swallow_bytes); |
| swallow_bytes = 0; |
| return read_from_client(); |
| } else { |
| swallow_bytes -= avail; |
| reader->consume(avail); |
| return EVENT_CONT; |
| } |
| } |
| |
| int |
| MC::swallow_cmd_then_read_from_client_event(int event, void *data) |
| { |
| int64_t avail = reader->read_avail(); |
| if (avail) { |
| int64_t n = reader->memchr('\n'); |
| if (n >= 0) { |
| reader->consume(n + 1); |
| return read_from_client(); |
| } |
| reader->consume(avail); |
| return EVENT_CONT; |
| } |
| return EVENT_CONT; |
| } |
| |
| int |
| MC::protocol_error() |
| { |
| Warning("tsmemcache: protocol error"); |
| return write_then_close(write_binary_error(PROTOCOL_BINARY_RESPONSE_EINVAL, 0)); |
| } |
| |
| int |
| MC::read_from_client() |
| { |
| if (swallow_bytes) { |
| return TS_SET_CALL(&MC::swallow_then_read_event, VC_EVENT_READ_READY, rvio); |
| } |
| read_offset = 0; |
| end_of_cmd = 0; |
| ngets = 0; |
| ff = 0; |
| if (crvc) { |
| crvc->do_io_close(); |
| crvc = 0; |
| crvio = NULL; |
| } |
| if (cwvc) { |
| cwvc->do_io_close(); |
| cwvc = 0; |
| cwvio = NULL; |
| } |
| if (cbuf) { |
| cbuf->clear(); |
| } |
| ink_assert(!crvc && !cwvc); |
| |
| ats_free(tbuf); |
| |
| return TS_SET_CALL(&MC::read_from_client_event, VC_EVENT_READ_READY, rvio); |
| } |
| |
| int |
| MC::write_to_client(int64_t towrite) |
| { |
| (void)towrite; |
| wvio->nbytes = INT64_MAX; |
| wvio->reenable(); |
| return EVENT_CONT; |
| } |
| |
| int |
| MC::write_binary_response(const void *d, int hlen, int keylen, int dlen) |
| { |
| if (!f.noreply || binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETQ || |
| binary_header.request.opcode == PROTOCOL_BINARY_CMD_GETKQ) { |
| add_binary_header(0, hlen, keylen, dlen); |
| if (dlen) { |
| MCDebug("tsmemcache", "response dlen %d\n", dlen); |
| wbuf->write(d, dlen); |
| } else { |
| MCDebug("tsmemcache", "no response\n"); |
| } |
| } |
| return writer->read_avail(); |
| } |
| |
| #define CHECK_READ_AVAIL(_n, _h) \ |
| do { \ |
| if (reader->read_avail() < _n) { \ |
| switch (event) { \ |
| case VC_EVENT_EOS: \ |
| if ((VIO *)data == rvio) \ |
| break; \ |
| /* fallthrough */ \ |
| case VC_EVENT_READ_READY: \ |
| return EVENT_CONT; \ |
| case VC_EVENT_WRITE_READY: \ |
| if (wvio->buffer.reader()->read_avail() > 0) \ |
| return EVENT_CONT; \ |
| /* fallthrough */ \ |
| case VC_EVENT_WRITE_COMPLETE: \ |
| return EVENT_DONE; \ |
| default: \ |
| break; \ |
| } \ |
| return die(); \ |
| } \ |
| } while (0) |
| |
| static char * |
| get_pointer(MC *mc, int start, int len) |
| { |
| if (mc->reader->block_read_avail() >= start + len) { |
| return mc->reader->start() + start; |
| } |
| // the block of data straddles an IOBufferBlock boundary, exceptional case, malloc |
| ink_assert(!mc->tbuf); |
| mc->tbuf = static_cast<char *>(ats_malloc(len)); |
| mc->reader->memcpy(mc->tbuf, len, start); |
| return mc->tbuf; |
| } |
| |
| static inline char * |
| binary_get_key(MC *mc) |
| { |
| return get_pointer(mc, 0, mc->binary_header.request.keylen); |
| } |
| |
| int |
| MC::cache_read_event(int event, void *data) |
| { |
| switch (event) { |
| case CACHE_EVENT_OPEN_READ: { |
| crvc = (CacheVConnection *)data; |
| int hlen = 0; |
| if (crvc->get_header((void **)&rcache_header, &hlen) < 0) { |
| goto Lfail; |
| } |
| if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || rcache_header->magic != TSMEMCACHE_HEADER_MAGIC) { |
| goto Lfail; |
| } |
| if (header.nkey != rcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + rcache_header->nkey)) { |
| goto Lfail; |
| } |
| if (memcmp(key, rcache_header->key(), header.nkey)) { |
| goto Lfail; |
| } |
| { |
| ink_hrtime t = Thread::get_hrtime(); |
| if ((static_cast<ink_hrtime>(rcache_header->settime)) <= last_flush || |
| t >= (static_cast<ink_hrtime>(rcache_header->settime)) + HRTIME_SECONDS(rcache_header->exptime)) { |
| goto Lfail; |
| } |
| } |
| break; |
| Lfail: |
| crvc->do_io_close(); |
| crvc = 0; |
| crvio = NULL; |
| event = CACHE_EVENT_OPEN_READ_FAILED; // convert to failure |
| break; |
| } |
| case VC_EVENT_EOS: |
| case VC_EVENT_ERROR: |
| case CACHE_EVENT_OPEN_READ_FAILED: |
| break; |
| default: |
| return EVENT_CONT; |
| } |
| return TS_POP_CALL(event, data); |
| } |
| |
| int |
| MC::get_item() |
| { |
| TS_PUSH_HANDLER(&MC::cache_read_event); |
| CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey); |
| pending_action = cacheProcessor.open_read(this, &cache_key); |
| return EVENT_CONT; |
| } |
| |
| int |
| MC::set_item() |
| { |
| CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey); |
| pending_action = cacheProcessor.open_write(this, &cache_key, CACHE_FRAG_TYPE_NONE, header.nbytes, |
| CACHE_WRITE_OPT_OVERWRITE | TSMEMCACHE_WRITE_SYNC); |
| return EVENT_CONT; |
| } |
| |
| int |
| MC::delete_item() |
| { |
| CryptoContext().hash_immediate(cache_key, (void *)key, (int)header.nkey); |
| pending_action = cacheProcessor.remove(this, &cache_key, CACHE_FRAG_TYPE_NONE); |
| return EVENT_CONT; |
| } |
| |
| int |
| MC::binary_get_event(int event, void *data) |
| { |
| ink_assert(!"EVENT_ITEM_GOT is incorrect here"); |
| |
| if (event == TSMEMCACHE_EVENT_GOT_ITEM) { |
| return unexpected_event(); |
| } |
| |
| CHECK_READ_AVAIL(binary_header.request.keylen, &MC::binary_get); |
| key = binary_get_key(this); |
| header.nkey = binary_header.request.keylen; |
| |
| return get_item(); |
| } |
| |
| int |
| MC::bin_read_key() |
| { |
| return -1; |
| } |
| |
| int |
| MC::read_binary_from_client_event(int event, void *data) |
| { |
| if (reader->read_avail() < (int)sizeof(binary_header)) { |
| return EVENT_CONT; |
| } |
| reader->memcpy(&binary_header, sizeof(binary_header)); |
| if (binary_header.request.magic != PROTOCOL_BINARY_REQ) { |
| Warning("tsmemcache: bad binary magic: %x", binary_header.request.magic); |
| return die(); |
| } |
| int keylen = binary_header.request.keylen = ntohs(binary_header.request.keylen); |
| int bodylen = binary_header.request.bodylen = ntohl(binary_header.request.bodylen); |
| binary_header.request.cas = ink_ntoh64(binary_header.request.cas); |
| int extlen = binary_header.request.extlen; |
| end_of_cmd = sizeof(binary_header) + extlen; |
| |
| #define CHECK_PROTOCOL(_e) \ |
| if (!(_e)) \ |
| return protocol_error(); |
| |
| MCDebug("tsmemcache", "bin cmd %d\n", binary_header.request.opcode); |
| switch (binary_header.request.opcode) { |
| case PROTOCOL_BINARY_CMD_VERSION: |
| CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0); |
| return write_to_client(write_binary_response(TSMEMCACHE_VERSION, 0, 0, STRLEN(TSMEMCACHE_VERSION))); |
| case PROTOCOL_BINARY_CMD_NOOP: |
| CHECK_PROTOCOL(extlen == 0 && keylen == 0 && bodylen == 0); |
| return write_to_client(write_binary_response(nullptr, 0, 0, 0)); |
| case PROTOCOL_BINARY_CMD_GETKQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_GETQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_GETK: |
| case PROTOCOL_BINARY_CMD_GET: |
| CHECK_PROTOCOL(extlen == 0 && (int)bodylen == keylen && keylen > 0); |
| return TS_SET_CALL(&MC::binary_get_event, event, data); |
| case PROTOCOL_BINARY_CMD_APPENDQ: |
| case PROTOCOL_BINARY_CMD_APPEND: |
| f.set_append = 1; |
| goto Lset; |
| case PROTOCOL_BINARY_CMD_PREPENDQ: |
| case PROTOCOL_BINARY_CMD_PREPEND: |
| f.set_prepend = 1; |
| goto Lset; |
| case PROTOCOL_BINARY_CMD_ADDQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_ADD: |
| CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8); |
| f.set_add = 1; |
| goto Lset; |
| case PROTOCOL_BINARY_CMD_REPLACEQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_REPLACE: |
| CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8); |
| f.set_replace = 1; |
| goto Lset; |
| case PROTOCOL_BINARY_CMD_SETQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_SET: { |
| CHECK_PROTOCOL(extlen == 8 && keylen != 0 && bodylen >= keylen + 8); |
| Lset: |
| if (bin_read_key() < 0) { |
| return EVENT_CONT; |
| } |
| key = binary_get_key(this); |
| header.nkey = keylen; |
| protocol_binary_request_set *req = reinterpret_cast<protocol_binary_request_set *>(&binary_header); |
| req->message.body.flags = ntohl(req->message.body.flags); |
| req->message.body.expiration = ntohl(req->message.body.expiration); |
| nbytes = bodylen - (header.nkey + extlen); |
| break; |
| } |
| case PROTOCOL_BINARY_CMD_DELETEQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_DELETE: |
| break; |
| case PROTOCOL_BINARY_CMD_INCREMENTQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_INCREMENT: |
| break; |
| case PROTOCOL_BINARY_CMD_DECREMENTQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_DECREMENT: |
| break; |
| case PROTOCOL_BINARY_CMD_QUITQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_QUIT: |
| if (f.noreply) { |
| return die(); |
| } |
| return write_then_close(write_binary_response(nullptr, 0, 0, 0)); |
| case PROTOCOL_BINARY_CMD_FLUSHQ: |
| f.noreply = 1; // fall through |
| case PROTOCOL_BINARY_CMD_FLUSH: |
| break; |
| break; |
| case PROTOCOL_BINARY_CMD_STAT: |
| break; |
| case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS: |
| case PROTOCOL_BINARY_CMD_SASL_AUTH: |
| case PROTOCOL_BINARY_CMD_SASL_STEP: |
| Warning("tsmemcache: sasl not (yet) supported"); |
| return die(); |
| case PROTOCOL_BINARY_CMD_RGET: |
| case PROTOCOL_BINARY_CMD_RSET: |
| case PROTOCOL_BINARY_CMD_RSETQ: |
| case PROTOCOL_BINARY_CMD_RAPPEND: |
| case PROTOCOL_BINARY_CMD_RAPPENDQ: |
| case PROTOCOL_BINARY_CMD_RPREPEND: |
| case PROTOCOL_BINARY_CMD_RPREPENDQ: |
| case PROTOCOL_BINARY_CMD_RDELETE: |
| case PROTOCOL_BINARY_CMD_RDELETEQ: |
| case PROTOCOL_BINARY_CMD_RINCR: |
| case PROTOCOL_BINARY_CMD_RINCRQ: |
| case PROTOCOL_BINARY_CMD_RDECR: |
| case PROTOCOL_BINARY_CMD_RDECRQ: |
| Warning("tsmemcache: range not (yet) supported"); |
| return die(); |
| default: |
| Warning("tsmemcache: unexpected binary opcode %x", binary_header.request.opcode); |
| return die(); |
| } |
| return EVENT_CONT; |
| } |
| |
| int |
| MC::ascii_response(const char *s, int len) |
| { |
| if (!f.noreply) { |
| wbuf->write(s, len); |
| wvio->nbytes = INT64_MAX; |
| wvio->reenable(); |
| MCDebugBuf("tsmemcache_ascii_response", s, len); |
| } |
| if (end_of_cmd > 0) { |
| reader->consume(end_of_cmd); |
| return read_from_client(); |
| } else if (end_of_cmd < 0) { |
| return read_from_client(); |
| } else { |
| return TS_SET_CALL(&MC::swallow_cmd_then_read_from_client_event, EVENT_NONE, NULL); |
| } |
| } |
| |
| char * |
| MC::get_ascii_input(int n, int *end) |
| { |
| int block_read_avail = reader->block_read_avail(); |
| if (block_read_avail >= n) { |
| Lblock: |
| *end = block_read_avail; |
| return reader->start(); |
| } |
| int read_avail = reader->read_avail(); |
| if (block_read_avail == read_avail) { |
| goto Lblock; |
| } |
| char *c = tmp_cmd_buffer; |
| int e = read_avail; |
| if (e > n) { |
| e = n; |
| } |
| reader->memcpy(c, e); |
| *end = e; |
| return c; |
| } |
| |
| int |
| MC::ascii_get_event(int event, void *data) |
| { |
| switch (event) { |
| case CACHE_EVENT_OPEN_READ_FAILED: |
| reader->consume(read_offset); |
| read_offset = 0; |
| break; |
| case CACHE_EVENT_OPEN_READ: { |
| wbuf->WRITE("VALUE "); |
| wbuf->write(key, header.nkey); |
| wbuf->WRITE(" "); |
| char t[32], *te = t + 32; |
| char *flags = xutoa(rcache_header->flags, te); |
| wbuf->write(flags, te - flags); |
| wbuf->WRITE(" "); |
| char *bytes = xutoa(rcache_header->nbytes, te); |
| wbuf->write(bytes, te - bytes); |
| if (f.return_cas) { |
| wbuf->WRITE(" "); |
| char *pcas = xutoa(rcache_header->cas, te); |
| wbuf->write(pcas, te - pcas); |
| } |
| wbuf->WRITE("\r\n"); |
| int ntowrite = writer->read_avail() + rcache_header->nbytes; |
| crvio = crvc->do_io_read(this, rcache_header->nbytes, wbuf); |
| creader = reader; |
| TS_PUSH_HANDLER(&MC::stream_event); |
| return write_to_client(ntowrite); |
| } |
| case TSMEMCACHE_STREAM_DONE: |
| crvc->do_io_close(); |
| crvc = 0; |
| crvio = NULL; |
| reader->consume(read_offset); |
| read_offset = 0; |
| wbuf->WRITE("\r\n"); |
| return ascii_gets(); |
| default: |
| break; |
| } |
| return ascii_gets(); |
| } |
| |
| int |
| MC::ascii_set_event(int event, void *data) |
| { |
| switch (event) { |
| case CACHE_EVENT_OPEN_WRITE_FAILED: |
| // another write currently in progress |
| mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL); |
| return EVENT_CONT; |
| case EVENT_INTERVAL: |
| return read_from_client(); |
| case CACHE_EVENT_OPEN_WRITE: { |
| cwvc = (CacheVConnection *)data; |
| int hlen = 0; |
| if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) { |
| if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) { |
| goto Lfail; |
| } |
| if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) { |
| goto Lfail; |
| } |
| ink_hrtime t = Thread::get_hrtime(); |
| if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush || |
| t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) { |
| goto Lstale; |
| } |
| if (f.set_add) { |
| return ASCII_RESPONSE("NOT_STORED"); |
| } |
| } else { |
| Lstale: |
| if (f.set_replace) { |
| return ASCII_RESPONSE("NOT_STORED"); |
| } |
| } |
| memcpy(tmp_cache_header_key, key, header.nkey); |
| header.settime = Thread::get_hrtime(); |
| if (exptime) { |
| if (exptime > REALTIME_MAXDELTA) { |
| if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) { |
| header.exptime = 0; |
| } else { |
| header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND)); |
| } |
| } else { |
| header.exptime = exptime; |
| } |
| } else { |
| header.exptime = UINT32_MAX; // 136 years |
| } |
| if (f.set_cas) { |
| if (!wcache_header) { |
| return ASCII_RESPONSE("NOT_FOUND"); |
| } |
| if (header.cas && header.cas != wcache_header->cas) { |
| return ASCII_RESPONSE("EXISTS"); |
| } |
| } |
| header.cas = ink_atomic_increment(&next_cas, 1); |
| if (f.set_append || f.set_prepend) { |
| header.nbytes = nbytes + rcache_header->nbytes; |
| } else { |
| header.nbytes = nbytes; |
| } |
| cwvc->set_header(&header, header.len()); |
| reader->consume(end_of_cmd); |
| end_of_cmd = -1; |
| swallow_bytes = 2; // \r\n |
| if (f.set_append) { |
| TS_PUSH_HANDLER(&MC::tunnel_event); |
| if (!cbuf) { |
| cbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| } |
| creader = cbuf->alloc_reader(); |
| crvio = crvc->do_io_read(this, rcache_header->nbytes, cbuf); |
| cwvio = cwvc->do_io_write(this, header.nbytes, creader); |
| } else { |
| if (f.set_prepend) { |
| int64_t a = reader->read_avail(); |
| if (a >= static_cast<int64_t>(nbytes)) { |
| a = static_cast<int64_t>(nbytes); |
| } |
| if (!cbuf) { |
| cbuf = new_empty_MIOBuffer(BUFFER_SIZE_INDEX_32K); |
| } |
| creader = cbuf->alloc_reader(); |
| if (a) { |
| cbuf->write(reader, a); |
| reader->consume(a); |
| } |
| if (a == static_cast<int64_t>(nbytes)) { |
| cwvio = cwvc->do_io_write(this, header.nbytes, creader); |
| goto Lstreamdone; |
| } |
| rvio->nbytes = rvio->ndone + (int64_t)nbytes - a; |
| } else { |
| creader = reader; |
| } |
| TS_PUSH_HANDLER(&MC::stream_event); |
| cwvio = cwvc->do_io_write(this, header.nbytes, creader); |
| } |
| return EVENT_CONT; |
| } |
| case TSMEMCACHE_STREAM_DONE: |
| rvio->nbytes = UINT64_MAX; |
| Lstreamdone: |
| if (f.set_prepend) { |
| TS_PUSH_HANDLER(&MC::tunnel_event); |
| crvio = crvc->do_io_read(this, rcache_header->nbytes, cbuf); |
| return EVENT_CONT; |
| } |
| return ASCII_RESPONSE("STORED"); |
| case TSMEMCACHE_TUNNEL_DONE: |
| crvc->do_io_close(); |
| crvc = 0; |
| crvio = NULL; |
| if (f.set_append) { |
| int64_t a = reader->read_avail(); |
| if (a > static_cast<int64_t>(nbytes)) { |
| a = static_cast<int64_t>(nbytes); |
| } |
| if (a) { |
| cbuf->write(reader, a); |
| reader->consume(a); |
| } |
| TS_PUSH_HANDLER(&MC::stream_event); |
| return handleEvent(VC_EVENT_READ_READY, rvio); |
| } |
| ink_assert(f.set_prepend); |
| cwvc->do_io_close(); |
| cwvc = 0; |
| return ASCII_RESPONSE("STORED"); |
| case CACHE_EVENT_OPEN_READ_FAILED: |
| swallow_bytes = nbytes + 2; |
| return ASCII_RESPONSE("NOT_STORED"); |
| case CACHE_EVENT_OPEN_READ: |
| crvc = (CacheVConnection *)data; |
| return set_item(); |
| default: |
| break; |
| } |
| return EVENT_CONT; |
| Lfail: |
| Warning("tsmemcache: bad cache data"); |
| return ASCII_SERVER_ERROR(""); |
| } |
| |
| int |
| MC::ascii_delete_event(int event, void *data) |
| { |
| switch (event) { |
| case CACHE_EVENT_REMOVE_FAILED: |
| return ASCII_RESPONSE("NOT_FOUND"); |
| case CACHE_EVENT_REMOVE: |
| return ASCII_RESPONSE("DELETED"); |
| default: |
| return EVENT_CONT; |
| } |
| } |
| |
| int |
| MC::ascii_incr_decr_event(int event, void *data) |
| { |
| switch (event) { |
| case CACHE_EVENT_OPEN_WRITE_FAILED: |
| // another write currently in progress |
| mutex->thread_holding->schedule_in(this, TSMEMCACHE_RETRY_WRITE_INTERVAL); |
| return EVENT_CONT; |
| case EVENT_INTERVAL: |
| return read_from_client(); |
| case CACHE_EVENT_OPEN_WRITE: { |
| int hlen = 0; |
| cwvc = (CacheVConnection *)data; |
| { |
| if (cwvc->get_header((void **)&wcache_header, &hlen) >= 0) { |
| if (hlen < static_cast<int>(sizeof(MCCacheHeader)) || wcache_header->magic != TSMEMCACHE_HEADER_MAGIC) { |
| goto Lfail; |
| } |
| if (header.nkey != wcache_header->nkey || hlen < static_cast<int>(sizeof(MCCacheHeader) + wcache_header->nkey)) { |
| goto Lfail; |
| } |
| ink_hrtime t = Thread::get_hrtime(); |
| if ((static_cast<ink_hrtime>(wcache_header->settime)) <= last_flush || |
| t >= (static_cast<ink_hrtime>(wcache_header->settime)) + HRTIME_SECONDS(wcache_header->exptime)) { |
| goto Lfail; |
| } |
| } else { |
| goto Lfail; |
| } |
| memcpy(tmp_cache_header_key, key, header.nkey); |
| header.settime = Thread::get_hrtime(); |
| if (exptime) { |
| if (exptime > REALTIME_MAXDELTA) { |
| if (HRTIME_SECONDS(exptime) <= (static_cast<ink_hrtime>(header.settime))) { |
| header.exptime = 0; |
| } else { |
| header.exptime = static_cast<int32_t>(exptime - (header.settime / HRTIME_SECOND)); |
| } |
| } else { |
| header.exptime = exptime; |
| } |
| } else { |
| header.exptime = UINT32_MAX; // 136 years |
| } |
| } |
| header.cas = ink_atomic_increment(&next_cas, 1); |
| { |
| char *localdata = nullptr; |
| int len = 0; |
| // must be huge, why convert to a counter ?? |
| if (cwvc->get_single_data((void **)&localdata, &len) < 0) { |
| goto Lfail; |
| } |
| uint64_t new_value = xatoull(localdata, localdata + len); |
| if (f.set_incr) { |
| new_value += delta; |
| } else { |
| if (delta > new_value) { |
| new_value = 0; |
| } else { |
| new_value -= delta; |
| } |
| } |
| char new_value_str_buffer[32], *e = &new_value_str_buffer[30]; |
| e[0] = '\r'; |
| e[1] = '\n'; |
| char *s = xutoa(new_value, e); |
| creader = wbuf->clone_reader(writer); |
| wbuf->write(s, e - s + 2); |
| if (f.noreply) { |
| writer->consume(e - s + 2); |
| } else { |
| wvio->reenable(); |
| } |
| MCDebugBuf("tsmemcache_ascii_response", s, e - s + 2); |
| header.nbytes = e - s; |
| cwvc->set_header(&header, header.len()); |
| TS_PUSH_HANDLER(&MC::stream_event); |
| cwvio = cwvc->do_io_write(this, header.nbytes, creader); |
| } |
| return EVENT_CONT; |
| } |
| case TSMEMCACHE_STREAM_DONE: { |
| wbuf->dealloc_reader(creader); |
| creader = 0; |
| reader->consume(end_of_cmd); |
| return read_from_client(); |
| } |
| default: |
| break; |
| } |
| return EVENT_CONT; |
| Lfail: |
| Warning("tsmemcache: bad cache data"); |
| return ASCII_RESPONSE("NOT_FOUND"); |
| } |
| |
| int |
| MC::get_ascii_key(char *as, char *e) |
| { |
| char *s = as; |
| // skip space |
| while (*s == ' ') { |
| s++; |
| if (s >= e) { |
| if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| return EVENT_CONT; |
| } |
| } |
| // grab key |
| key = s; |
| while (!isspace(*s)) { |
| if (s >= e) { |
| if (as - e >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE) { |
| return ASCII_RESPONSE("key too large"); |
| } |
| return EVENT_CONT; |
| } |
| s++; |
| } |
| if (s - key > TSMEMCACHE_MAX_KEY_LEN) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| header.nkey = s - key; |
| if (!header.nkey) { |
| if (e - s >= 2) { |
| if (*s == '\r') { |
| s++; |
| } |
| if (*s == '\n' && ngets) { |
| return ASCII_RESPONSE("END"); |
| } |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| return EVENT_CONT; // get some more |
| } |
| read_offset = s - as; |
| return TSMEMCACHE_EVENT_GOT_KEY; |
| } |
| |
| int |
| MC::ascii_get(char *as, char *e) |
| { |
| SET_HANDLER(&MC::ascii_get_event); |
| CHECK_RET(get_ascii_key(as, e), TSMEMCACHE_EVENT_GOT_KEY); |
| ngets++; |
| return get_item(); |
| } |
| |
| int |
| MC::ascii_gets() |
| { |
| int len = 0; |
| char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len); |
| return ascii_get(c, c + len); |
| } |
| |
| #define SKIP_SPACE \ |
| do { \ |
| while (*s == ' ') { \ |
| s++; \ |
| if (s >= e) \ |
| return ASCII_CLIENT_ERROR("bad command line"); \ |
| } \ |
| } while (0) |
| |
| #define SKIP_TOKEN \ |
| do { \ |
| while (!isspace(*s)) { \ |
| s++; \ |
| if (s >= e) \ |
| return ASCII_CLIENT_ERROR("bad command line"); \ |
| } \ |
| } while (0) |
| |
| #define GET_NUM(_n) \ |
| do { \ |
| if (isdigit(*s)) { \ |
| _n = *s - '0'; \ |
| s++; \ |
| if (s >= e) \ |
| return ASCII_CLIENT_ERROR("bad command line"); \ |
| } else \ |
| _n = 0; \ |
| while (isdigit(*s)) { \ |
| _n *= 10; \ |
| _n += *s - '0'; \ |
| s++; \ |
| if (s >= e) \ |
| return ASCII_CLIENT_ERROR("bad command line"); \ |
| } \ |
| } while (0) |
| |
| #define GET_SNUM(_n) \ |
| do { \ |
| int neg = 0; \ |
| if (*s == '-') { \ |
| s++; \ |
| neg = 1; \ |
| } \ |
| if (isdigit(*s)) { \ |
| _n = *s - '0'; \ |
| s++; \ |
| if (s >= e) \ |
| return ASCII_CLIENT_ERROR("bad command line"); \ |
| } else \ |
| _n = 0; \ |
| while (isdigit(*s)) { \ |
| _n *= 10; \ |
| _n += *s - '0'; \ |
| s++; \ |
| if (s >= e) \ |
| return ASCII_CLIENT_ERROR("bad command line"); \ |
| } \ |
| if (neg) \ |
| _n = -_n; \ |
| } while (0) |
| |
| int |
| MC::ascii_set(char *s, char *e) |
| { |
| SKIP_SPACE; |
| key = s; |
| SKIP_TOKEN; |
| header.nkey = s - key; |
| SKIP_SPACE; |
| GET_NUM(header.flags); |
| SKIP_SPACE; |
| GET_SNUM(exptime); |
| SKIP_SPACE; |
| GET_NUM(nbytes); |
| swallow_bytes = nbytes + 2; // assume failure |
| if (f.set_cas) { |
| SKIP_SPACE; |
| GET_NUM(header.cas); |
| } else { |
| header.cas = 0; |
| } |
| SKIP_SPACE; |
| if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) { |
| f.noreply = 1; |
| s += 7; |
| if (s >= e) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| SKIP_SPACE; |
| } |
| if (*s == '\r') { |
| s++; |
| } |
| if (*s == '\n') { |
| s++; |
| } |
| if (s != e) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| SET_HANDLER(&MC::ascii_set_event); |
| if (f.set_append || f.set_prepend) { |
| return get_item(); |
| } else { |
| return set_item(); |
| } |
| } |
| |
| int |
| MC::ascii_delete(char *s, char *e) |
| { |
| SKIP_SPACE; |
| key = s; |
| SKIP_TOKEN; |
| header.nkey = s - key; |
| SKIP_SPACE; |
| if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) { |
| f.noreply = 1; |
| s += 7; |
| if (s >= e) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| SKIP_SPACE; |
| } |
| if (*s == '0') { |
| s++; |
| } |
| if (*s == '\r') { |
| s++; |
| } |
| if (*s == '\n') { |
| s++; |
| } |
| if (s != e) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| SET_HANDLER(&MC::ascii_delete_event); |
| return delete_item(); |
| } |
| |
| int |
| MC::ascii_incr_decr(char *s, char *e) |
| { |
| SKIP_SPACE; |
| key = s; |
| SKIP_TOKEN; |
| header.nkey = s - key; |
| SKIP_SPACE; |
| GET_NUM(delta); |
| SKIP_SPACE; |
| if (*s == 'n' && !STRCMP_REST("oreply", s + 1, e)) { |
| f.noreply = 1; |
| s += 7; |
| if (s >= e) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| SKIP_SPACE; |
| } |
| if (*s == '\r') { |
| s++; |
| } |
| if (*s == '\n') { |
| s++; |
| } |
| if (s != e) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| SET_HANDLER(&MC::ascii_incr_decr_event); |
| return set_item(); |
| } |
| |
| static int |
| is_end_of_cmd(char *t, char *e) |
| { |
| while (*t == ' ' && t < e) { |
| t++; // skip spaces |
| } |
| if (*t == '\r') { |
| t++; |
| } |
| if (t != e - 1) { |
| return 0; |
| } |
| return 1; |
| } |
| |
| // moves *pt past the noreply if it is found |
| static int |
| is_noreply(char **pt, char *e) |
| { |
| char *t = *pt; |
| if (t < e - 8) { |
| while (*t == ' ') { |
| if (t > e - 8) { |
| return 0; |
| } |
| t++; |
| } |
| if (t[0] == 'n' && !STRCMP(t + 1, "oreply") && isspace(t[7])) { |
| *pt = t + sizeof("noreply") - 1; |
| return 1; |
| } |
| } |
| return 0; |
| } |
| |
| int |
| MC::read_ascii_from_client_event(int event, void *data) |
| { |
| int len = 0; |
| char *c = get_ascii_input(TSMEMCACHE_TMP_CMD_BUFFER_SIZE, &len), *s = c; |
| MCDebugBuf("tsmemcache_ascii_cmd", c, len); |
| char *e = c + len - 5; // at least 6 chars |
| while (*s == ' ' && s < e) { |
| s++; // skip leading spaces |
| } |
| if (s >= e) { |
| if (len >= TSMEMCACHE_TMP_CMD_BUFFER_SIZE || memchr(c, '\n', len)) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| return EVENT_CONT; |
| } |
| // gets can be large, so do not require the full cmd fit in the buffer |
| e = c + len; |
| switch (*s) { |
| case 'g': // get gets |
| if (s[3] == 's' && s[4] == ' ') { |
| f.return_cas = 1; |
| read_offset = 5; |
| goto Lget; |
| } else if (s[3] == ' ') { |
| read_offset = 4; |
| Lget: |
| reader->consume(read_offset); |
| if (c != tmp_cmd_buffer) { // all in the block |
| return ascii_get(s + read_offset, e); |
| } else { |
| return ascii_gets(); |
| } |
| } |
| break; |
| case 'b': // bget |
| if (s[4] != ' ') { |
| break; |
| } |
| read_offset = 5; |
| goto Lget; |
| break; |
| default: |
| break; |
| } |
| // find the end of the command |
| e = static_cast<char *>(memchr(s, '\n', len)); |
| if (!e) { |
| if (reader->read_avail() > TSMEMCACHE_MAX_CMD_SIZE) { |
| return ASCII_CLIENT_ERROR("bad command line"); |
| } |
| return EVENT_CONT; |
| } |
| e++; // skip nl |
| end_of_cmd = e - c; |
| switch (*s) { |
| case 's': // set stats |
| if (s[1] == 'e' && s[2] == 't' && s[3] == ' ') { |
| return ascii_set(s + sizeof("set") - 1, e); |
| } |
| if (STRCMP_REST("tats", s + 1, e)) { |
| break; |
| } |
| s += sizeof("stats") - 1; |
| if (is_noreply(&s, e)) { |
| break; // to please memcapable |
| } else { |
| return ASCII_RESPONSE("END"); |
| } |
| case 'a': // add |
| if (s[1] == 'd' && s[2] == 'd' && s[3] == ' ') { |
| f.set_add = 1; |
| return ascii_set(s + sizeof("add") - 1, e); |
| } |
| if (STRCMP_REST("ppend", s + 1, e)) { |
| break; |
| } |
| f.set_append = 1; |
| return ascii_set(s + sizeof("append") - 1, e); |
| case 'p': // prepend |
| if (STRCMP_REST("repend", s + 1, e)) { |
| break; |
| } |
| f.set_prepend = 1; |
| return ascii_set(s + sizeof("prepend") - 1, e); |
| case 'c': // cas |
| if (s[1] == 'a' && s[2] == 's' && s[3] == ' ') { |
| f.set_cas = 1; |
| return ascii_set(s + sizeof("cas") - 1, e); |
| } |
| break; |
| case 'i': // incr |
| if (s[1] == 'n' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') { |
| f.set_incr = 1; |
| return ascii_incr_decr(s + sizeof("incr") - 1, e); |
| } |
| break; |
| case 'f': { // flush_all |
| if (STRCMP_REST("lush_all", s + 1, e)) { |
| break; |
| } |
| s += sizeof("flush_all") - 1; |
| SKIP_SPACE; |
| int32_t time_offset = 0; |
| if (isdigit(*s)) { |
| GET_NUM(time_offset); |
| } |
| f.noreply = is_noreply(&s, e); |
| ink_hrtime new_last_flush = Thread::get_hrtime() + HRTIME_SECONDS(time_offset); |
| #if __WORDSIZE == 64 |
| last_flush = new_last_flush; // this will be atomic for native word size |
| #else |
| ink_atomic_swap(&last_flush, new_last_flush); |
| #endif |
| if (!is_end_of_cmd(s, e)) { |
| break; |
| } |
| return ASCII_RESPONSE("OK"); |
| } |
| case 'd': // delete decr |
| if (e - s < 5) { |
| break; |
| } |
| if (s[2] == 'l') { |
| if (s[1] == 'e' && s[3] == 'e' && s[4] == 't' && s[5] == 'e' && s[6] == ' ') { |
| return ascii_delete(s + sizeof("delete") - 1, e); |
| } |
| } else if (s[1] == 'e' && s[2] == 'c' && s[3] == 'r' && s[4] == ' ') { // decr |
| f.set_decr = 1; |
| return ascii_incr_decr(s + sizeof("decr") - 1, e); |
| } |
| break; |
| case 'r': // replace |
| if (STRCMP_REST("eplace", s + 1, e)) { |
| break; |
| } |
| f.set_replace = 1; |
| return ascii_set(s + sizeof("replace") - 1, e); |
| case 'q': // quit |
| if (STRCMP_REST("uit", s + 1, e)) { |
| break; |
| } |
| if (!is_end_of_cmd(s + sizeof("quit") - 1, e)) { |
| break; |
| } |
| return die(); |
| case 'v': { // version |
| if (s[3] == 's') { |
| if (STRCMP_REST("ersion", s + 1, e)) { |
| break; |
| } |
| if (!is_end_of_cmd(s + sizeof("version") - 1, e)) { |
| break; |
| } |
| return ASCII_RESPONSE("VERSION " TSMEMCACHE_VERSION); |
| } else if (s[3] == 'b') { |
| if (STRCMP_REST("erbosity", s + 1, e)) { |
| break; |
| } |
| s += sizeof("verbosity") - 1; |
| SKIP_SPACE; |
| if (!isdigit(*s)) { |
| break; |
| } |
| GET_NUM(verbosity); |
| f.noreply = is_noreply(&s, e); |
| if (!is_end_of_cmd(s, e)) { |
| break; |
| } |
| return ASCII_RESPONSE("OK"); |
| } |
| break; |
| } |
| } |
| return ASCII_ERROR(); |
| } |
| |
| int |
| MC::write_then_close_event(int event, void *data) |
| { |
| switch (event) { |
| case VC_EVENT_EOS: |
| if ((VIO *)data == wvio) { |
| break; |
| } |
| // fall through |
| case VC_EVENT_READ_READY: |
| return EVENT_DONE; // no more of that stuff |
| case VC_EVENT_WRITE_READY: |
| if (wvio->buffer.reader()->read_avail() > 0) { |
| return EVENT_CONT; |
| } |
| break; |
| default: |
| break; |
| } |
| return die(); |
| } |
| |
| int |
| MC::read_from_client_event(int event, void *data) |
| { |
| switch (event) { |
| case TSMEMCACHE_STREAM_DONE: |
| return read_from_client(); |
| case VC_EVENT_READ_READY: |
| case VC_EVENT_EOS: |
| if (reader->read_avail() < 1) { |
| return EVENT_CONT; |
| } |
| if ((uint8_t)reader->start()[0] == (uint8_t)PROTOCOL_BINARY_REQ) { |
| return TS_SET_CALL(&MC::read_binary_from_client_event, event, data); |
| } else { |
| return TS_SET_CALL(&MC::read_ascii_from_client_event, event, data); |
| } |
| case VC_EVENT_WRITE_READY: |
| case VC_EVENT_WRITE_COMPLETE: |
| break; |
| default: |
| return die(); |
| } |
| return EVENT_CONT; |
| } |
| |
| // between client and cache |
| int |
| MC::stream_event(int event, void *data) |
| { |
| if (data == crvio || data == cwvio) { |
| switch (event) { |
| case VC_EVENT_READ_READY: |
| wvio->reenable(); |
| break; |
| case VC_EVENT_WRITE_READY: |
| rvio->reenable(); |
| break; |
| case VC_EVENT_WRITE_COMPLETE: |
| case VC_EVENT_EOS: |
| case VC_EVENT_READ_COMPLETE: |
| return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0); |
| default: |
| return die(); |
| } |
| } else { |
| switch (event) { |
| case VC_EVENT_READ_READY: |
| if (cwvio) { |
| if (creader != reader && creader->read_avail() < cwvio->nbytes) { |
| int64_t a = reader->read_avail(); |
| if (a > static_cast<int64_t>(nbytes)) { |
| a = static_cast<int64_t>(nbytes); |
| } |
| if (a) { |
| cbuf->write(reader, a); |
| reader->consume(a); |
| } |
| } |
| cwvio->reenable(); |
| } |
| break; |
| case VC_EVENT_WRITE_READY: |
| if (crvio) { |
| crvio->reenable(); |
| } |
| break; |
| case VC_EVENT_WRITE_COMPLETE: |
| case VC_EVENT_READ_COMPLETE: |
| return TS_POP_CALL(TSMEMCACHE_STREAM_DONE, 0); |
| default: |
| return die(); |
| } |
| } |
| return EVENT_CONT; |
| } |
| |
| // cache to cache |
| int |
| MC::tunnel_event(int event, void *data) |
| { |
| MCDebug("tsmemcache", "tunnel %d %p crvio %p cwvio %p", event, data, crvio, cwvio); |
| if (data == crvio) { |
| switch (event) { |
| case VC_EVENT_READ_READY: |
| cwvio->reenable(); |
| break; |
| case VC_EVENT_EOS: |
| case VC_EVENT_READ_COMPLETE: |
| if (cwvio->nbytes == cwvio->ndone + cwvio->buffer.reader()->read_avail()) { |
| cwvio->reenable(); |
| return EVENT_CONT; |
| } |
| return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0); |
| default: |
| return die(); |
| } |
| } else if (data == cwvio) { |
| switch (event) { |
| case VC_EVENT_WRITE_READY: |
| crvio->reenable(); |
| break; |
| case VC_EVENT_WRITE_COMPLETE: |
| case VC_EVENT_EOS: |
| return TS_POP_CALL(TSMEMCACHE_TUNNEL_DONE, 0); |
| default: |
| return die(); |
| } |
| } else { // network I/O |
| switch (event) { |
| case VC_EVENT_READ_READY: |
| case VC_EVENT_WRITE_READY: |
| case VC_EVENT_WRITE_COMPLETE: |
| case VC_EVENT_READ_COMPLETE: |
| return EVENT_CONT; |
| default: |
| return die(); |
| } |
| } |
| return EVENT_CONT; |
| } |
| |
| int |
| init_tsmemcache(int port) |
| { |
| tsmemcache_constants(); |
| MCAccept *a = new MCAccept; |
| a->mutex = new_ProxyMutex(); |
| NetProcessor::AcceptOptions options(NetProcessor::DEFAULT_ACCEPT_OPTIONS); |
| options.local_port = a->accept_port = port; |
| netProcessor.accept(a, options); |
| return 0; |
| } |
| |
| void |
| TSPluginInit(int argc, const char *argv[]) |
| { |
| ink_assert(sizeof(protocol_binary_request_header) == 24); |
| |
| TSPluginRegistrationInfo info; |
| info.plugin_name = (char *)"tsmemcache"; |
| info.vendor_name = (char *)"ats"; |
| info.support_email = (char *)"jplevyak@apache.org"; |
| |
| int port = 11211; |
| |
| if (TSPluginRegister(&info) != TS_SUCCESS) { |
| TSError("[PluginInit] tsmemcache registration failed.\n"); |
| goto error; |
| } |
| |
| if (argc < 2) { |
| TSError("[tsmemcache] Usage: tsmemcache.so [accept_port]\n"); |
| goto error; |
| } else { |
| int port = atoi(argv[1]); |
| if (!port) { |
| TSError("[tsmemcache] bad accept_port '%s'\n", argv[1]); |
| goto error; |
| } |
| MCDebug("tsmemcache", "using accept_port %d", port); |
| } |
| init_tsmemcache(port); |
| return; |
| |
| error: |
| TSError("[PluginInit] Plugin not initialized"); |
| } |