blob: eb408917557165513960802f670659dcc648d262 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef QUICKSTEP_STORAGE_STORAGE_MANAGER_HPP_
#define QUICKSTEP_STORAGE_STORAGE_MANAGER_HPP_
#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
#include "query_optimizer/QueryOptimizerConfig.h" // For QUICKSTEP_DISTRIBUTED
#include "storage/CountedReference.hpp"
#ifdef QUICKSTEP_DISTRIBUTED
#include "storage/DataExchange.grpc.pb.h"
#endif
#include "storage/EvictionPolicy.hpp"
#include "storage/FileManager.hpp"
#include "storage/StorageBlob.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageConstants.hpp"
#include "threading/SpinSharedMutex.hpp"
#include "utility/Macros.hpp"
#include "utility/ShardedLockManager.hpp"
#include "gflags/gflags.h"
#include "gtest/gtest_prod.h"
#include "tmb/id_typedefs.h"
#ifdef QUICKSTEP_DISTRIBUTED
namespace grpc { class Channel; }
#endif
namespace tmb { class MessageBus; }
namespace quickstep {
DECLARE_int32(block_domain);
DECLARE_uint64(buffer_pool_slots);
class CatalogRelationSchema;
#ifdef QUICKSTEP_DISTRIBUTED
class PullResponse;
#endif
class StorageBlockLayout;
/** \addtogroup Storage
* @{
*/
/**
* @brief A class which manages block storage in memory and is responsible for
* creating, saving, and loading StorageBlock and StorageBlob instances.
**/
class StorageManager {
public:
/**
* @brief Constructor.
* @param storage_path The filesystem directory where blocks have persistent
* storage. This should end with a path-separator character.
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
*/
explicit StorageManager(const std::string &storage_path)
: StorageManager(storage_path,
FLAGS_block_domain,
FLAGS_buffer_pool_slots,
LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
2,
std::chrono::milliseconds(200))) {
}
/**
* @brief Constructor.
* @param storage_path The filesystem directory where blocks have persistent
* storage.
* @param max_memory_usage The maximum amount of memory that the storage
* manager should use for cached blocks in slots. If
* an block is requested that is not currently in
* memory and there are already max_memory_usage slots
* in use in memory, then the storage manager will
* attempt to evict enough blocks to make room for the
* requested block; if it cannot evict enough blocks,
* it will fetch the requested block anyway,
* temporarily going over the memory limit.
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
**/
StorageManager(const std::string &storage_path,
const size_t max_memory_usage)
: StorageManager(storage_path,
FLAGS_block_domain,
max_memory_usage,
LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
2,
std::chrono::milliseconds(200))) {
}
#ifdef QUICKSTEP_DISTRIBUTED
/**
* @brief Constructor.
* @param storage_path The filesystem directory where blocks have persistent
* storage.
* @param block_domain The unique block domain.
* @param block_locator_client_id The TMB client ID of the block locator.
* @param bus A pointer to the TMB.
*
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
**/
StorageManager(const std::string &storage_path,
const block_id_domain block_domain,
const tmb::client_id block_locator_client_id,
tmb::MessageBus *bus)
: StorageManager(storage_path,
block_domain,
FLAGS_buffer_pool_slots,
LRUKEvictionPolicyFactory::ConstructLRUKEvictionPolicy(
2,
std::chrono::milliseconds(200)),
block_locator_client_id,
bus) {
}
#endif
/**
* @brief Constructor.
* @param storage_path The filesystem directory where blocks have persistent
* storage.
* @param block_domain The unique block domain.
* @param max_memory_usage The maximum amount of memory that the storage
* manager should use for cached blocks in slots. If
* an block is requested that is not currently in
* memory and there are already max_memory_usage slots
* in use in memory, then the storage manager will
* attempt to evict enough blocks to make room for the
* requested block; if it cannot evict enough blocks,
* it will fetch the requested block anyway,
* temporarily going over the memory limit.
* @param eviction_policy The eviction policy that the storage manager should
* use to manage the cache. The storage manager takes
* ownership of *eviction_policy.
* @param block_locator_client_id The TMB client ID of the block locator.
* @param bus A pointer to the TMB.
*
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
**/
StorageManager(const std::string &storage_path,
const block_id_domain block_domain,
const size_t max_memory_usage,
EvictionPolicy *eviction_policy,
const tmb::client_id block_locator_client_id = tmb::kClientIdNone,
tmb::MessageBus *bus = nullptr);
/**
* @brief Destructor which also destroys all managed blocks.
**/
~StorageManager();
/**
* @brief Determine the number of slots needed to store the specified number
* of bytes.
* @note The specified number of slots may include some "extra" bytes, modulo
* the size of a slot in the memory pool.
* @note This is mainly intended to help create StorageBlobs of the
* appropriate size. StorageBlocks have internal structures and
* metadata which require additional storage beyond the "raw" bytes for
* tuples.
*
* @param bytes The desired number of bytes.
* @return The number of slots needed to store bytes.
**/
static std::size_t SlotsNeededForBytes(const std::size_t bytes) {
return (bytes + kSlotSizeBytes - 1) / kSlotSizeBytes;
}
/**
* @brief Determine the size of the memory pool managed by this
* StorageManager.
* @note This is provided for informational purposes and provides a snapshot
* of the memory pool size at the time it is called. The memory pool
* may grow as needed during a call to createBlock() or loadBlock().
*
* @return The amount of allocated memory managed by this StorageManager in
* bytes.
**/
std::size_t getMemorySize() const {
return kSlotSizeBytes * total_memory_usage_;
}
/**
* @brief Return the upper limit on the number of buffer pool slots that the
* StorageManager can allocate. This number is specified during the
* initialization of the StorageManager. The size of each slot is
* kSlotSizeBytes.
* @note This information is provided for informational purposes. The memory
* pool may grow larger than this upper limite temporarily, depending
* on the path that is followed in a call to createBlock() or
* loadBlock().
*
* @return The number of buffer pool slots managed by this StorageManager.
**/
std::size_t getMaxBufferPoolSlots() const {
return max_memory_usage_;
}
/**
* @brief Create a new empty block.
*
* @param relation The relation which the new block will belong to (you must
* also call addBlock() on the relation).
* @param layout The StorageBlockLayout to use for the new block.
* @param numa_node The NUMA node on which the block should be created. The
* default value is -1 and it means that the Catalog
* Relation has no NUMAPlacementScheme associated with it
* and hence the block will be created as per the OS policy.
* @return The id of the newly-created block.
**/
block_id createBlock(const CatalogRelationSchema &relation,
const StorageBlockLayout &layout,
const int numa_node = -1);
/**
* @brief Create a new StorageBlob. The blob memory will initially be
* zero-filled.
*
* @param num_slots The size of the StorageBlob in slots. Must not exceed
* kAllocationChunkSizeSlots.
* @param numa_node The NUMA node on which the blob should be created. The
* default value is -1 and it means that the blob will be
* created as per the default OS policy.
* @return The id of the newly-created blob.
**/
block_id createBlob(const std::size_t num_slots, const int numa_node = -1);
/**
* @brief Check whether a StorageBlock or StorageBlob is loaded into memory.
* @note This is provided for informational purposes and determines if the
* specified block is loaded at the moment the method is called. It is
* possible for the block to be loaded by a concurrent thread
* immediately after this method returns.
*
* @param block The id of the block.
* @return Whether the block with the specified id is in memory.
**/
bool blockOrBlobIsLoaded(const block_id block) const;
/**
* @brief Save a block or blob in memory to the persistent storage.
*
* @param block The id of the block or blob to save.
* @param force Force the block to the persistent storage, even if it is not
* dirty (by default, only actually write dirty blocks to the
* persistent storage).
* @exception UnableToOpenFile The block's persistent storage file couldn't
* be opened for writing.
* @exception FileWriteError An IO error occurred while writing the block's
* persistent storage file.
*
* @return False if the block is not found in the memory. True if the block is
* successfully saved to the persistent storage OR the block is clean
* and force is false.
**/
bool saveBlockOrBlob(const block_id block, const bool force = false);
/**
* @brief Delete a block or blob's file in the persistent storage. The block
* is automatically evicted.
*
* @param block The id of the block whose file will be deleted.
**/
void deleteBlockOrBlobFile(const block_id block);
/**
* @brief Get a block. If the block is not in memory it will be loaded.
*
* @param block The id of the block to get.
* @param relation The Catalog Relation this block belongs to.
* @param numa_node The NUMA node for placing this block. If the block is
* already present in the buffer pool, this is ignored.
* If set to -1, the default OS memory-allocation policy
* will be used.
* @return The block with the given id.
* @exception OutOfMemory The system has run out of memory.
**/
BlockReference getBlock(const block_id block,
const CatalogRelationSchema &relation,
const int numa_node = -1) {
return BlockReference(getBlockInternal(block, relation, numa_node));
}
/**
* @brief Get a mutable pointer to a block. If the block is not in memory it
* will be loaded.
*
* @param block The id of the block to get.
* @param relation The Catalog Relation this block belongs to.
* @param numa_node The NUMA node for placing this block. If the block is
* already present in the buffer pool, this is ignored.
* If set to -1, the default OS memory-allocation policy
* will be used.
* @return The block with the given id.
* @exception OutOfMemory The system has run out of memory.
**/
MutableBlockReference getBlockMutable(const block_id block,
const CatalogRelationSchema &relation,
const int numa_node = -1) {
return getBlockInternal(block, relation, numa_node);
}
/**
* @brief Get a blob. If the block is not in memory it will be loaded.
*
* @param block The id of the blob to get.
* @param numa_node The NUMA node for placing this blob. If the blob is
* already present in the buffer pool, this is ignored.
* If set to -1, the default OS memory-allocation policy
* will be used.
* @return The blob with the given id.
* @exception OutOfMemory The system has run out of memory.
**/
BlobReference getBlob(const block_id blob, const int numa_node = -1) {
return BlobReference(getBlobInternal(blob, numa_node));
}
/**
* @brief Get a mutable pointer to a blob. If the block is not in memory it
* will be loaded.
*
* @param block The id of the blob to get.
* @param numa_node The NUMA node for placing this blob. If the blob is
* already present in the buffer pool, this is ignored.
* If set to -1, the default OS memory-allocation policy
* will be used.
* @return The blob with the given id.
* @exception OutOfMemory The system has run out of memory.
**/
MutableBlobReference getBlobMutable(const block_id blob,
const int numa_node = -1) {
return getBlobInternal(blob, numa_node);
}
/**
* @brief Check if a block or blob is loaded in memory AND is dirty.
*
* @param block The id of the block or blob.
* @return True if it's both loaded and dirty, false otherwise.
**/
bool blockOrBlobIsLoadedAndDirty(const block_id block);
#ifdef QUICKSTEP_DISTRIBUTED
/**
* @brief Send BlockDomainToShiftbossIndexMessage to BlockLocator so that
* ForemanDistributed could take advantages of block locality info
* for a better scheduling policy.
*
* @param shiftboss_index The Shiftboss index.
**/
void sendBlockDomainToShiftbossIndexMessage(const std::size_t shiftboss_index);
/**
* @brief Pull a block or a blob. Used by DataExchangerAsync.
*
* @param block The id of the block or blob.
* @param response Where to store the pulled block content.
**/
void pullBlockOrBlob(const block_id block, PullResponse *response) const;
#endif
/**
* @brief Get the HDFS connector via libhdfs3.
*
* @return The HDFS connector.
**/
void* hdfs();
private:
struct BlockHandle {
void *block_memory;
std::size_t block_memory_size; // size of block_memory in slots
StorageBlockBase *block;
};
#ifdef QUICKSTEP_DISTRIBUTED
/**
* @brief A class which connects to DataExchangerAsync to exchange data from
* remote peers.
**/
class DataExchangerClientAsync {
public:
/**
* @brief Constructor.
*
* @param channel The RPC channel to connect DataExchangerAsync.
* @param storage_manager The StorageManager to use.
*/
DataExchangerClientAsync(const std::shared_ptr<grpc::Channel> &channel,
StorageManager *storage_manager);
/**
* @brief Pull a block or blob from a remote StorageManager.
*
* @param block The block or blob to pull.
* @param numa_node The NUMA node for placing this block.
* @param block_handle Where the pulled block or blob stores.
*
* @return Whether the pull operation is successful or not.
*/
bool Pull(const block_id block,
const numa_node_id numa_node,
BlockHandle *block_handle);
private:
std::unique_ptr<DataExchange::Stub> stub_;
StorageManager *storage_manager_;
DISALLOW_COPY_AND_ASSIGN(DataExchangerClientAsync);
};
/**
* @brief Get the network info of the given block domain.
*
* @param block_domain The domain of block or blob to pull.
*
* @return The network info of the given block domain.
**/
std::string getPeerDomainNetworkAddress(const block_id_domain block_domain);
/**
* @brief Update the block location info in BlockLocator.
*
* @param block The given block or blob.
* @param message_type Indicate whether to add or delete a block location.
**/
void sendBlockLocationMessage(const block_id block,
const tmb::message_type_id message_type);
#endif
// Helper for createBlock() and createBlob(). Allocates a block ID and memory
// slots for a new StorageBlock or StorageBlob. Returns the allocated ID and
// writes the allocated slot range into 'handle->slot_index_low' and
// 'handle->slot_index_high'.
block_id allocateNewBlockOrBlob(const std::size_t num_slots,
BlockHandle *handle,
const int numa_node);
// Helper for loadBlock() and loadBlob(). Allocates memory slots for the
// loaded block or blob and reads raw data from the persistent storage into
// memory. Returns a partially-constructed BlockHandle which has
// 'slot_index_low' and 'slot_index_high' properly set to indicate the memory
// slots which raw data has been read into, but for which a StorageBlob or
// StorageBlock object has not yet been constructed.
BlockHandle loadBlockOrBlob(const block_id block, const int numa_node);
// Helper for loadBlock() and loadBlob(). Inserts an entry (block, handle)
// into 'blocks_'. If there is already an entry in 'blocks_' for 'block',
// deletes the blob or block in 'handle', frees the slots which it occupied.
void insertBlockHandleAfterLoad(const block_id block,
const BlockHandle &handle);
// Allocate a buffer to hold the specified number of slots. The memory
// returned will be zeroed-out, and mapped using large pages if the system
// supports it.
void* allocateSlots(const std::size_t num_slots,
const int numa_node);
// Deallocate a buffer allocated by allocateSlots(). This must be used
// instead of free(), because the underlying implementation of
// allocateSlots() may use mmap instead of malloc.
void deallocateSlots(void *slots,
const std::size_t num_slots);
/**
* @brief Evict a block or blob from memory.
* @note The block is NOT automatically saved, so call saveBlock() first if
* necessary.
*
* @param block The id of the block to evict.
* @exception BlockNotFoundInMemory The block with the specified id is not
* in memory.
**/
void evictBlockOrBlob(const block_id block);
/**
* @brief Do the work that is common to getBlock and getBlockMutable, namely,
* actually loading the block if necessary.
* @param block The block to be acquired.
* @param relation The relation the block is a part of.
* @param numa_node The NUMA node where the block is to be loaded.
* @return A CountedReference to the block.
* @exception OutOfMemory The system has run out of memory.
*/
MutableBlockReference getBlockInternal(const block_id block,
const CatalogRelationSchema &relation,
const int numa_node);
/**
* @brief Do the work that is common to getBlob and getBlobMutable, namely,
* actually loading the block if necessary.
* @param blob The blob to be acquired.
* @param numa_node The NUMA node where the blob is to be loaded.
* @return A CountedReference to the blob.
* @exception OutOfMemory The system has run out of memory.
*/
MutableBlobReference getBlobInternal(const block_id blob,
const int numa_node);
/**
* @brief Evict blocks or blobs until there is enough space for a new block
* or blob of the requested size.
*
* @note This non-blocking method gives up evictions if there is a shard
* collision, and thus the buffer pool size may temporarily go beyond
* the memory limit.
*
* @param slots Number of slots to make room for.
*/
void makeRoomForBlockOrBlob(const std::size_t slots);
/**
* @brief Load a block from the persistent storage into memory.
*
* @param block The id of the block to load.
* @param relation The relation which the block belongs to.
* @param numa_node The NUMA node where the block is to be loaded.
* @return The StorageBlock that was loaded.
* @exception BlockNotFoundInPersistentStorage The block with the specified
* id can't be found on the persistent storage.
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
* @exception OutOfMemory The system has run out of memory.
* @exception UnableToOpenFile The block's persistent storage file couldn't be
* opened for reading.
**/
StorageBlock* loadBlock(const block_id block,
const CatalogRelationSchema &relation,
const int numa_node);
/**
* @brief Load a StorageBlob from the persistent storage into memory.
*
* @param blob The ID of the blob to load.
* @param numa_node The NUMA node where the blob is to be loaded.
*
* @exception BlockNotFoundInPersistentStorage The block with the specified
* id can't be found on the persistent storage.
* @exception CorruptPersistentStorage The storage directory layout is not
* in the expected format.
* @exception OutOfMemory The system has run out of memory.
* @exception UnableToOpenFile The block's persistent storage file couldn't be
* opened for reading.
**/
StorageBlob* loadBlob(const block_id blob, const int numa_node);
// File system path where block files are stored. Fixed when StorageManager
// is created.
const std::string storage_path_;
// The current memory usage of all storage blocks and blobs in slots.
std::atomic<size_t> total_memory_usage_;
// The maximum allowed memory usage of all storage blocks and blobs in slots.
const size_t max_memory_usage_;
std::unique_ptr<EvictionPolicy> eviction_policy_;
#ifdef QUICKSTEP_DISTRIBUTED
const block_id_domain block_domain_;
tmb::client_id storage_manager_client_id_;
#endif
const tmb::client_id block_locator_client_id_;
tmb::MessageBus *bus_;
std::unique_ptr<FileManager> file_manager_;
// Used to generate unique IDs in allocateNewBlockOrBlob().
std::atomic<block_id> block_index_;
// Directory of in-memory blocks. Read by blockIsLoaded(), saveBlock(),
// getBlock()/getBlockMutable() and blockOrBlobIsLoadedAndDirty().
// Modified by createBlock(), loadBlock(), and evictBlock().
//
// TODO(chasseur): Look into concurrent map implementations that allow
// finer-grained locking.
std::unordered_map<block_id, BlockHandle> blocks_;
alignas(kCacheLineBytes) mutable SpinSharedMutex<false> blocks_shared_mutex_;
// Used to pull a remote block.
std::unordered_map<block_id_domain, std::string> block_domain_network_addresses_;
alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_network_addresses_shared_mutex_;
// This lock manager is used with the following contract:
// (1) A block cannot be evicted unless an exclusive lock is held on its
// lock shard.
// (2) If it is not safe to evict a block, then either that block's
// reference count is greater than 0 or a shared lock is held on the
// block's lock shard.
// TODO(jmp): Would be good to set this more intelligently in the future
// based on the hardware concurrency, the amount of main memory
// and slot size. For now pick the largest prime that is less
// than 8K.
static constexpr std::size_t kLockManagerNumShards = 0x2000-1;
ShardedLockManager<block_id, kLockManagerNumShards, SpinSharedMutex<false>> lock_manager_;
friend class BlockLocatorTest;
FRIEND_TEST(BlockLocatorTest, BlockTest);
FRIEND_TEST(BlockLocatorTest, BlobTest);
FRIEND_TEST(StorageManagerTest, DifferentNUMANodeBlobTestWithEviction);
FRIEND_TEST(StorageManagerTest, EvictFromSameShardTest);
DISALLOW_COPY_AND_ASSIGN(StorageManager);
};
/** @} */
} // namespace quickstep
#endif // QUICKSTEP_STORAGE_STORAGE_MANAGER_HPP_