| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "MetaDataHelper.h" |
| #include <filesystem> |
| #include <Core/Settings.h> |
| #include <Disks/ObjectStorages/MetadataStorageFromDisk.h> |
| #include <Interpreters/Context.h> |
| #include <Storages/MergeTree/MergeSparkMergeTreeTask.h> |
| #include <Poco/StringTokenizer.h> |
| #include <Common/QueryContext.h> |
| |
| namespace CurrentMetrics |
| { |
| extern const Metric LocalThread; |
| extern const Metric LocalThreadActive; |
| extern const Metric LocalThreadScheduled; |
| } |
| |
| namespace DB |
| { |
| namespace Setting |
| { |
| extern const SettingsSeconds lock_acquire_timeout; |
| extern const SettingsMaxThreads max_threads; |
| } |
| namespace ErrorCodes |
| { |
| extern const int NOT_IMPLEMENTED; |
| } |
| } |
| |
| using namespace DB; |
| |
| namespace local_engine |
| { |
| static const String METADATA_FILE_NAME = "metadata.gluten"; |
| |
| std::unordered_map<String, String> extractPartMetaData(ReadBuffer & in) |
| { |
| std::unordered_map<String, String> result; |
| while (!in.eof()) |
| { |
| String name; |
| readString(name, in); |
| assertChar('\t', in); |
| UInt64 size; |
| readIntText(size, in); |
| assertChar('\n', in); |
| String data; |
| data.resize(size); |
| in.read(data.data(), size); |
| result.emplace(name, data); |
| } |
| return result; |
| } |
| |
| enum SupportedMetaDataStorageType |
| { |
| UNKNOWN = 0, |
| ROCKSDB, |
| LOCAL |
| }; |
| |
| template <SupportedMetaDataStorageType type> |
| static void |
| restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) |
| { |
| UNREACHABLE(); |
| } |
| |
| template <> |
| void restoreMetaData<ROCKSDB>( |
| const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) |
| { |
| auto data_disk = storage->getStoragePolicy()->getAnyDisk(); |
| std::unordered_set<String> not_exists_part; |
| auto metadata_storage = data_disk->getMetadataStorage(); |
| auto table_path = std::filesystem::path(mergeTreeTable.relative_path); |
| for (const auto & part : mergeTreeTable.getPartNames()) |
| { |
| auto part_path = table_path / part; |
| if (!metadata_storage->existsDirectory(part_path)) |
| not_exists_part.emplace(part); |
| } |
| |
| if (auto lock = storage->lockForAlter(context.getSettingsRef()[Setting::lock_acquire_timeout])) |
| { |
| // put this return clause in lockForAlter |
| // so that it will not return until other thread finishes restoring |
| if (not_exists_part.empty()) |
| return; |
| |
| auto s3 = data_disk->getObjectStorage(); |
| auto transaction = metadata_storage->createTransaction(); |
| |
| if (!metadata_storage->existsDirectory(table_path)) |
| transaction->createDirectoryRecursive(table_path.generic_string()); |
| |
| for (const auto & part : not_exists_part) |
| { |
| auto part_path = table_path / part; |
| auto metadata_file_path = part_path / METADATA_FILE_NAME; |
| |
| if (metadata_storage->existsDirectory(part_path)) |
| return; |
| else |
| transaction->createDirectoryRecursive(part_path); |
| auto key = s3->generateObjectKeyForPath(metadata_file_path.generic_string(), std::nullopt); |
| StoredObject metadata_object(key.serialize()); |
| auto read_settings = ReadSettings{}; |
| read_settings.enable_filesystem_cache = false; |
| auto part_metadata = extractPartMetaData(*s3->readObject(metadata_object, read_settings)); |
| for (const auto & item : part_metadata) |
| { |
| auto item_path = part_path / item.first; |
| transaction->writeStringToFile(item_path, item.second); |
| } |
| } |
| transaction->commit(); |
| } |
| } |
| |
| template <> |
| void restoreMetaData<LOCAL>( |
| const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) |
| { |
| const auto data_disk = storage->getStoragePolicy()->getAnyDisk(); |
| std::unordered_set<String> not_exists_part; |
| const DB::MetadataStorageFromDisk * metadata_storage = static_cast<MetadataStorageFromDisk *>(data_disk->getMetadataStorage().get()); |
| const auto metadata_disk = metadata_storage->getDisk(); |
| const auto table_path = std::filesystem::path(mergeTreeTable.relative_path); |
| for (const auto & part : mergeTreeTable.getPartNames()) |
| { |
| auto part_path = table_path / part; |
| if (!metadata_disk->existsDirectory(part_path)) |
| not_exists_part.emplace(part); |
| } |
| |
| if (auto lock = storage->lockForAlter(context.getSettingsRef()[Setting::lock_acquire_timeout])) |
| { |
| // put this return clause in lockForAlter |
| // so that it will not return until other thread finishes restoring |
| if (not_exists_part.empty()) |
| return; |
| |
| // Increase the speed of metadata recovery |
| auto max_concurrency = std::max(static_cast<UInt64>(10), QueryContext::globalContext()->getSettingsRef()[Setting::max_threads].value); |
| auto max_threads = std::min(max_concurrency, static_cast<UInt64>(not_exists_part.size())); |
| FreeThreadPool thread_pool( |
| CurrentMetrics::LocalThread, |
| CurrentMetrics::LocalThreadActive, |
| CurrentMetrics::LocalThreadScheduled, |
| max_threads, |
| max_threads, |
| not_exists_part.size()); |
| auto s3 = data_disk->getObjectStorage(); |
| |
| if (!metadata_disk->existsDirectory(table_path)) |
| metadata_disk->createDirectories(table_path.generic_string()); |
| |
| for (const auto & part : not_exists_part) |
| { |
| auto job = [&]() |
| { |
| auto part_path = table_path / part; |
| auto metadata_file_path = part_path / METADATA_FILE_NAME; |
| |
| if (metadata_disk->existsDirectory(part_path)) |
| return; |
| else |
| metadata_disk->createDirectories(part_path); |
| auto key = s3->generateObjectKeyForPath(metadata_file_path.generic_string(), std::nullopt); |
| StoredObject metadata_object(key.serialize()); |
| auto read_settings = ReadSettings{}; |
| read_settings.enable_filesystem_cache = false; |
| auto part_metadata = extractPartMetaData(*s3->readObject(metadata_object, read_settings)); |
| for (const auto & item : part_metadata) |
| { |
| auto item_path = part_path / item.first; |
| auto out = metadata_disk->writeFile(item_path); |
| out->write(item.second.data(), item.second.size()); |
| out->finalize(); |
| } |
| }; |
| thread_pool.scheduleOrThrow(job); |
| } |
| thread_pool.wait(); |
| } |
| } |
| |
| |
| bool isMergeTreePartMetaDataFile(const String & file_name) |
| { |
| return file_name.ends_with(METADATA_FILE_NAME); |
| } |
| |
| void restoreMetaData(const SparkStorageMergeTreePtr & storage, const MergeTreeTableInstance & mergeTreeTable, const Context & context) |
| { |
| const auto data_disk = storage->getStoragePolicy()->getAnyDisk(); |
| if (!data_disk->isRemote()) |
| return; |
| auto metadata_storage = data_disk->getMetadataStorage(); |
| if (metadata_storage->getType() == MetadataStorageType::Local) |
| restoreMetaData<LOCAL>(storage, mergeTreeTable, context); |
| // None is RocksDB |
| else if (metadata_storage->getType() == MetadataStorageType::None) |
| restoreMetaData<ROCKSDB>(storage, mergeTreeTable, context); |
| else |
| throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unsupported metadata storage type {}.", metadata_storage->getType()); |
| } |
| |
| void saveFileStatus( |
| const DB::MergeTreeData & storage, const DB::ContextPtr & context, const String & part_name, IDataPartStorage & data_part_storage) |
| { |
| const DiskPtr disk = storage.getStoragePolicy()->getAnyDisk(); |
| if (!disk->isRemote()) |
| return; |
| auto meta_storage = disk->getMetadataStorage(); |
| const auto out = data_part_storage.writeFile(METADATA_FILE_NAME, DBMS_DEFAULT_BUFFER_SIZE, context->getWriteSettings()); |
| for (const auto it = data_part_storage.iterate(); it->isValid(); it->next()) |
| { |
| auto content = meta_storage->readFileToString(it->path()); |
| writeString(it->name(), *out); |
| writeChar('\t', *out); |
| writeIntText(content.length(), *out); |
| writeChar('\n', *out); |
| writeString(content, *out); |
| } |
| out->finalize(); |
| |
| LOG_DEBUG(&Poco::Logger::get("MetaDataHelper"), "Save part {} metadata success.", part_name); |
| } |
| |
| |
| MergeTreeDataPartPtr mergeParts( |
| std::vector<DB::DataPartPtr> selected_parts, |
| const String & new_part_uuid, |
| SparkStorageMergeTree & storage, |
| const String & partition_dir, |
| const String & bucket_dir) |
| { |
| auto future_part = std::make_shared<DB::FutureMergedMutatedPart>(); |
| future_part->uuid = UUIDHelpers::generateV4(); |
| |
| future_part->assign(std::move(selected_parts), /*patch_parts_=*/ {}); |
| future_part->part_info = MergeListElement::FAKE_RESULT_PART_FOR_PROJECTION; |
| |
| //TODO: name |
| future_part->name = partition_dir.empty() ? "" : partition_dir + "/"; |
| if (!bucket_dir.empty()) |
| future_part->name = future_part->name + bucket_dir + "/"; |
| future_part->name = future_part->name + new_part_uuid + "-merged"; |
| |
| auto entry = std::make_shared<DB::MergeMutateSelectedEntry>( |
| future_part, DB::CurrentlyMergingPartsTaggerPtr{}, std::make_shared<DB::MutationCommands>()); |
| |
| // Copying a vector of columns `deduplicate by columns. |
| DB::IExecutableTask::TaskResultCallback f = [](bool) { }; |
| const auto task = std::make_shared<MergeSparkMergeTreeTask>( |
| storage, storage.getInMemoryMetadataPtr(), false, std::vector<std::string>{}, false, entry, DB::TableLockHolder{}, f); |
| |
| task->setCurrentTransaction(DB::MergeTreeTransactionHolder{}, DB::MergeTreeTransactionPtr{}); |
| |
| while (task->executeStep()) |
| { |
| } |
| |
| std::vector<MergeTreeDataPartPtr> merged = storage.loadDataPartsWithNames({future_part->name}); |
| assert(merged.size() == 1); |
| return merged[0]; |
| } |
| |
| } |