blob: a51d245e5856eced8c5e6fc19df6e9e590a4b964 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include <atomic>
#define CACHE_BLOCK_SHIFT 9
#define CACHE_BLOCK_SIZE (1 << CACHE_BLOCK_SHIFT) // 512, smallest sector size
#define ROUND_TO_STORE_BLOCK(_x) INK_ALIGN((_x), STORE_BLOCK_SIZE)
#define ROUND_TO_CACHE_BLOCK(_x) INK_ALIGN((_x), CACHE_BLOCK_SIZE)
#define ROUND_TO_SECTOR(_p, _x) INK_ALIGN((_x), _p->sector_size)
#define ROUND_TO(_x, _y) INK_ALIGN((_x), (_y))
// Vol (volumes)
#define VOL_MAGIC 0xF1D0F00D
#define START_BLOCKS 16 // 8k, STORE_BLOCK_SIZE
#define START_POS ((off_t)START_BLOCKS * CACHE_BLOCK_SIZE)
#define AGG_SIZE (4 * 1024 * 1024) // 4MB
#define AGG_HIGH_WATER (AGG_SIZE / 2) // 2MB
#define EVACUATION_SIZE (2 * AGG_SIZE) // 8MB
#define MAX_VOL_SIZE ((off_t)512 * 1024 * 1024 * 1024 * 1024)
#define STORE_BLOCKS_PER_CACHE_BLOCK (STORE_BLOCK_SIZE / CACHE_BLOCK_SIZE)
#define MAX_VOL_BLOCKS (MAX_VOL_SIZE / CACHE_BLOCK_SIZE)
#define MAX_FRAG_SIZE (AGG_SIZE - sizeof(Doc)) // true max
#define LEAVE_FREE DEFAULT_MAX_BUFFER_SIZE
#define PIN_SCAN_EVERY 16 // scan every 1/16 of disk
#define VOL_HASH_TABLE_SIZE 32707
#define VOL_HASH_EMPTY 0xFFFF
#define VOL_HASH_ALLOC_SIZE (8 * 1024 * 1024) // one chance per this unit
#define LOOKASIDE_SIZE 256
#define EVACUATION_BUCKET_SIZE (2 * EVACUATION_SIZE) // 16MB
#define RECOVERY_SIZE EVACUATION_SIZE // 8MB
#define AIO_NOT_IN_PROGRESS 0
#define AIO_AGG_WRITE_IN_PROGRESS -1
#define AUTO_SIZE_RAM_CACHE -1 // 1-1 with directory size
#define DEFAULT_TARGET_FRAGMENT_SIZE (1048576 - sizeof(Doc)) // 1MB
#define dir_offset_evac_bucket(_o) (_o / (EVACUATION_BUCKET_SIZE / CACHE_BLOCK_SIZE))
#define dir_evac_bucket(_e) dir_offset_evac_bucket(dir_offset(_e))
#define offset_evac_bucket(_d, _o) \
dir_offset_evac_bucket((_d->offset_to_vol_offset(_o)
// Documents
#define DOC_MAGIC ((uint32_t)0x5F129B13)
#define DOC_CORRUPT ((uint32_t)0xDEADBABE)
#define DOC_NO_CHECKSUM ((uint32_t)0xA0B0C0D0)
struct Cache;
struct Vol;
struct CacheDisk;
struct VolInitInfo;
struct DiskVol;
struct CacheVol;
struct VolHeaderFooter {
unsigned int magic;
ts::VersionNumber version;
time_t create_time;
off_t write_pos;
off_t last_write_pos;
off_t agg_pos;
uint32_t generation; // token generation (vary), this cannot be 0
uint32_t phase;
uint32_t cycle;
uint32_t sync_serial;
uint32_t write_serial;
uint32_t dirty;
uint32_t sector_size;
uint32_t unused; // pad out to 8 byte boundary
uint16_t freelist[1];
};
// Key and Earliest key for each fragment that needs to be evacuated
struct EvacuationKey {
SLink<EvacuationKey> link;
CryptoHash key;
CryptoHash earliest_key;
};
struct EvacuationBlock {
union {
unsigned int init;
struct {
unsigned int done : 1; // has been evacuated
unsigned int pinned : 1; // check pinning timeout
unsigned int evacuate_head : 1; // check pinning timeout
unsigned int unused : 29;
} f;
};
int readers;
Dir dir;
Dir new_dir;
// we need to have a list of evacuationkeys because of collision.
EvacuationKey evac_frags;
CacheVC *earliest_evacuator;
LINK(EvacuationBlock, link);
};
struct Vol : public Continuation {
char *path = nullptr;
ats_scoped_str hash_text;
CryptoHash hash_id;
int fd = -1;
char *raw_dir = nullptr;
Dir *dir = nullptr;
VolHeaderFooter *header = nullptr;
VolHeaderFooter *footer = nullptr;
int segments = 0;
off_t buckets = 0;
off_t recover_pos = 0;
off_t prev_recover_pos = 0;
off_t scan_pos = 0;
off_t skip = 0; // start of headers
off_t start = 0; // start of data
off_t len = 0;
off_t data_blocks = 0;
int hit_evacuate_window = 0;
AIOCallbackInternal io;
Queue<CacheVC, Continuation::Link_link> agg;
Queue<CacheVC, Continuation::Link_link> stat_cache_vcs;
Queue<CacheVC, Continuation::Link_link> sync;
char *agg_buffer = nullptr;
int agg_todo_size = 0;
int agg_buf_pos = 0;
Event *trigger = nullptr;
OpenDir open_dir;
RamCache *ram_cache = nullptr;
int evacuate_size = 0;
DLL<EvacuationBlock> *evacuate = nullptr;
DLL<EvacuationBlock> lookaside[LOOKASIDE_SIZE];
CacheVC *doc_evacuator = nullptr;
VolInitInfo *init_info = nullptr;
CacheDisk *disk = nullptr;
Cache *cache = nullptr;
CacheVol *cache_vol = nullptr;
uint32_t last_sync_serial = 0;
uint32_t last_write_serial = 0;
uint32_t sector_size = 0;
bool recover_wrapped = false;
bool dir_sync_waiting = false;
bool dir_sync_in_progress = false;
bool writing_end_marker = false;
CacheKey first_fragment_key;
int64_t first_fragment_offset = 0;
Ptr<IOBufferData> first_fragment_data;
void cancel_trigger();
int recover_data();
int open_write(CacheVC *cont, int allow_if_writers, int max_writers);
int open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers);
int close_write(CacheVC *cont);
int close_write_lock(CacheVC *cont);
int begin_read(CacheVC *cont);
int begin_read_lock(CacheVC *cont);
// unused read-write interlock code
// currently http handles a write-lock failure by retrying the read
OpenDirEntry *open_read(const CryptoHash *key);
OpenDirEntry *open_read_lock(CryptoHash *key, EThread *t);
int close_read(CacheVC *cont);
int close_read_lock(CacheVC *cont);
int clear_dir();
int init(char *s, off_t blocks, off_t dir_skip, bool clear);
int handle_dir_clear(int event, void *data);
int handle_dir_read(int event, void *data);
int handle_recover_from_data(int event, void *data);
int handle_recover_write_dir(int event, void *data);
int handle_header_read(int event, void *data);
int dir_init_done(int event, void *data);
int dir_check(bool fix);
int db_check(bool fix);
int
is_io_in_progress()
{
return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
}
int
increment_generation()
{
// this is stored in the offset field of the directory (!=0)
ink_assert(mutex->thread_holding == this_ethread());
header->generation++;
if (!header->generation)
header->generation++;
return header->generation;
}
void
set_io_not_in_progress()
{
io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
}
int aggWriteDone(int event, Event *e);
int aggWrite(int event, void *e);
void agg_wrap();
int evacuateWrite(CacheVC *evacuator, int event, Event *e);
int evacuateDocReadDone(int event, Event *e);
int evacuateDoc(int event, Event *e);
int evac_range(off_t start, off_t end, int evac_phase);
void periodic_scan();
void scan_for_pinned_documents();
void evacuate_cleanup_blocks(int i);
void evacuate_cleanup();
EvacuationBlock *force_evacuate_head(Dir *dir, int pinned);
int within_hit_evacuate_window(Dir *dir);
uint32_t round_to_approx_size(uint32_t l);
// inline functions
int headerlen(); // calculates the total length of the vol header and the freelist
int direntries(); // total number of dir entries
Dir *dir_segment(int s); // returns the first dir in the segment s
size_t dirlen(); // calculates the total length of header, directories and footer
int vol_out_of_phase_valid(Dir *e);
int vol_out_of_phase_agg_valid(Dir *e);
int vol_out_of_phase_write_valid(Dir *e);
int vol_in_phase_valid(Dir *e);
int vol_in_phase_agg_buf_valid(Dir *e);
off_t vol_offset(Dir *e);
off_t offset_to_vol_offset(off_t pos);
off_t vol_offset_to_offset(off_t pos);
off_t vol_relative_length(off_t start_offset);
Vol() : Continuation(new_ProxyMutex())
{
open_dir.mutex = mutex;
agg_buffer = (char *)ats_memalign(ats_pagesize(), AGG_SIZE);
memset(agg_buffer, 0, AGG_SIZE);
SET_HANDLER(&Vol::aggWrite);
}
~Vol() override { ats_memalign_free(agg_buffer); }
};
struct AIO_Callback_handler : public Continuation {
int handle_disk_failure(int event, void *data);
AIO_Callback_handler() : Continuation(new_ProxyMutex()) { SET_HANDLER(&AIO_Callback_handler::handle_disk_failure); }
};
struct CacheVol {
int vol_number = -1;
int scheme = 0;
off_t size = 0;
int num_vols = 0;
bool ramcache_enabled = true;
Vol **vols = nullptr;
DiskVol **disk_vols = nullptr;
LINK(CacheVol, link);
// per volume stats
RecRawStatBlock *vol_rsb = nullptr;
CacheVol() {}
};
// Note : hdr() needs to be 8 byte aligned.
struct Doc {
uint32_t magic; // DOC_MAGIC
uint32_t len; // length of this fragment (including hlen & sizeof(Doc), unrounded)
uint64_t total_len; // total length of document
#if TS_ENABLE_FIPS == 1
// For FIPS CryptoHash is 256 bits vs. 128, and the 'first_key' must be checked first, so
// ensure that the new 'first_key' overlaps the old 'first_key' and that the rest of the data layout
// is the same by putting 'key' at the ned.
CryptoHash first_key; ///< first key in object.
#else
CryptoHash first_key; ///< first key in object.
CryptoHash key; ///< Key for this doc.
#endif
uint32_t hlen; ///< Length of this header.
uint32_t doc_type : 8; ///< Doc type - indicates the format of this structure and its content.
uint32_t v_major : 8; ///< Major version number.
uint32_t v_minor : 8; ///< Minor version number.
uint32_t unused : 8; ///< Unused, forced to zero.
uint32_t sync_serial;
uint32_t write_serial;
uint32_t pinned; // pinned until
uint32_t checksum;
#if TS_ENABLE_FIPS == 1
CryptoHash key; ///< Key for this doc.
#endif
uint32_t data_len();
uint32_t prefix_len();
int single_fragment();
int no_data_in_fragment();
char *hdr();
char *data();
};
// Global Data
extern Vol **gvol;
extern std::atomic<int> gnvol;
extern ClassAllocator<OpenDirEntry> openDirEntryAllocator;
extern ClassAllocator<EvacuationBlock> evacuationBlockAllocator;
extern ClassAllocator<EvacuationKey> evacuationKeyAllocator;
extern unsigned short *vol_hash_table;
// inline Functions
TS_INLINE int
Vol::headerlen()
{
return ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter) + sizeof(uint16_t) * (this->segments - 1));
}
TS_INLINE Dir *
Vol::dir_segment(int s)
{
return (Dir *)(((char *)this->dir) + (s * this->buckets) * DIR_DEPTH * SIZEOF_DIR);
}
TS_INLINE size_t
Vol::dirlen()
{
return this->headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->buckets) * DIR_DEPTH * this->segments * SIZEOF_DIR) +
ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter));
}
TS_INLINE int
Vol::direntries()
{
return this->buckets * DIR_DEPTH * this->segments;
}
TS_INLINE int
Vol::vol_out_of_phase_valid(Dir *e)
{
return (dir_offset(e) - 1 >= ((this->header->agg_pos - this->start) / CACHE_BLOCK_SIZE));
}
TS_INLINE int
Vol::vol_out_of_phase_agg_valid(Dir *e)
{
return (dir_offset(e) - 1 >= ((this->header->agg_pos - this->start + AGG_SIZE) / CACHE_BLOCK_SIZE));
}
TS_INLINE int
Vol::vol_out_of_phase_write_valid(Dir *e)
{
return (dir_offset(e) - 1 >= ((this->header->write_pos - this->start) / CACHE_BLOCK_SIZE));
}
TS_INLINE int
Vol::vol_in_phase_valid(Dir *e)
{
return (dir_offset(e) - 1 < ((this->header->write_pos + this->agg_buf_pos - this->start) / CACHE_BLOCK_SIZE));
}
TS_INLINE off_t
Vol::vol_offset(Dir *e)
{
return this->start + (off_t)dir_offset(e) * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
}
TS_INLINE off_t
Vol::offset_to_vol_offset(off_t pos)
{
return ((pos - this->start + CACHE_BLOCK_SIZE) / CACHE_BLOCK_SIZE);
}
TS_INLINE off_t
Vol::vol_offset_to_offset(off_t pos)
{
return this->start + pos * CACHE_BLOCK_SIZE - CACHE_BLOCK_SIZE;
}
TS_INLINE int
Vol::vol_in_phase_agg_buf_valid(Dir *e)
{
return (this->vol_offset(e) >= this->header->write_pos && this->vol_offset(e) < (this->header->write_pos + this->agg_buf_pos));
}
// length of the partition not including the offset of location 0.
TS_INLINE off_t
Vol::vol_relative_length(off_t start_offset)
{
return (this->len + this->skip) - start_offset;
}
TS_INLINE uint32_t
Doc::prefix_len()
{
return sizeof(Doc) + hlen;
}
TS_INLINE uint32_t
Doc::data_len()
{
return len - sizeof(Doc) - hlen;
}
TS_INLINE int
Doc::single_fragment()
{
return data_len() == total_len;
}
TS_INLINE char *
Doc::hdr()
{
return reinterpret_cast<char *>(this) + sizeof(Doc);
}
TS_INLINE char *
Doc::data()
{
return this->hdr() + hlen;
}
int vol_dir_clear(Vol *d);
int vol_init(Vol *d, char *s, off_t blocks, off_t skip, bool clear);
// inline Functions
TS_INLINE EvacuationBlock *
evacuation_block_exists(Dir *dir, Vol *p)
{
EvacuationBlock *b = p->evacuate[dir_evac_bucket(dir)].head;
for (; b; b = b->link.next)
if (dir_offset(&b->dir) == dir_offset(dir))
return b;
return nullptr;
}
TS_INLINE void
Vol::cancel_trigger()
{
if (trigger) {
trigger->cancel_action();
trigger = nullptr;
}
}
TS_INLINE EvacuationBlock *
new_EvacuationBlock(EThread *t)
{
EvacuationBlock *b = THREAD_ALLOC(evacuationBlockAllocator, t);
b->init = 0;
b->readers = 0;
b->earliest_evacuator = nullptr;
b->evac_frags.link.next = nullptr;
return b;
}
TS_INLINE void
free_EvacuationBlock(EvacuationBlock *b, EThread *t)
{
EvacuationKey *e = b->evac_frags.link.next;
while (e) {
EvacuationKey *n = e->link.next;
evacuationKeyAllocator.free(e);
e = n;
}
THREAD_FREE(b, evacuationBlockAllocator, t);
}
TS_INLINE OpenDirEntry *
Vol::open_read(const CryptoHash *key)
{
return open_dir.open_read(key);
}
TS_INLINE int
Vol::within_hit_evacuate_window(Dir *xdir)
{
off_t oft = dir_offset(xdir) - 1;
off_t write_off = (header->write_pos + AGG_SIZE - start) / CACHE_BLOCK_SIZE;
off_t delta = oft - write_off;
if (delta >= 0)
return delta < hit_evacuate_window;
else
return -delta > (data_blocks - hit_evacuate_window) && -delta < data_blocks;
}
TS_INLINE uint32_t
Vol::round_to_approx_size(uint32_t l)
{
uint32_t ll = round_to_approx_dir_size(l);
return ROUND_TO_SECTOR(this, ll);
}