blob: 8ab29e7e81504f4b6a0b319e43e94eab41374829 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SparkStorageMergeTree.h"
#include <Disks/ObjectStorages/CompactObjectStorageDiskTransaction.h>
#include <Disks/SingleDiskVolume.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/SparkMergeTreeSink.h>
#include <Storages/MergeTree/checkDataPart.h>
namespace ProfileEvents
{
extern const Event LoadedDataParts;
extern const Event LoadedDataPartsMicroseconds;
}
namespace DB
{
namespace MergeTreeSetting
{
extern const MergeTreeSettingsBool assign_part_uuids;
extern const MergeTreeSettingsFloat ratio_of_defaults_for_sparse_serialization;
extern const MergeTreeSettingsBool fsync_part_directory;
extern const MergeTreeSettingsBool fsync_after_insert;
}
namespace ErrorCodes
{
extern const int DUPLICATE_DATA_PART;
extern const int NO_SUCH_DATA_PART;
}
}
namespace local_engine
{
using namespace DB;
void SparkStorageMergeTree::analysisPartsByRanges(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges_in_data_parts)
{
ReadFromMergeTree::AnalysisResult result;
result.column_names_to_read = source.getAllColumnNames();
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
if (result.column_names_to_read.empty())
{
NamesAndTypesList available_real_columns = source.getStorageMetadata()->getColumns().getAllPhysical();
result.column_names_to_read.push_back(ExpressionActions::getSmallestColumn(available_real_columns).name);
}
result.sampling = MergeTreeDataSelectSamplingData();
result.parts_with_ranges = ranges_in_data_parts;
size_t sum_marks = 0;
size_t sum_ranges = 0;
size_t sum_rows = 0;
size_t total_marks_pk = 0;
size_t sum_marks_pk = 0;
for (const auto & part : result.parts_with_ranges)
{
sum_ranges += part.ranges.size();
sum_marks += part.getMarksCount();
sum_rows += part.getRowsCount();
total_marks_pk += part.data_part->index_granularity->getMarksCountWithoutFinal();
for (auto range : part.ranges)
sum_marks_pk += range.getNumberOfMarks();
}
result.total_parts = ranges_in_data_parts.size();
result.parts_before_pk = ranges_in_data_parts.size();
result.selected_parts = ranges_in_data_parts.size();
result.selected_ranges = sum_ranges;
result.selected_marks = sum_marks;
result.selected_marks_pk = sum_marks_pk;
result.total_marks_pk = total_marks_pk;
result.selected_rows = sum_rows;
if (source.getQueryInfo().input_order_info)
result.read_type
= (source.getQueryInfo().input_order_info->direction > 0) ? MergeTreeReadType::InOrder : MergeTreeReadType::InReverseOrder;
source.setAnalyzedResult(std::make_shared<ReadFromMergeTree::AnalysisResult>(std::move(result)));
}
void SparkStorageMergeTree::wrapRangesInDataParts(DB::ReadFromMergeTree & source, const DB::RangesInDataParts & ranges)
{
auto result = source.getAnalysisResult();
std::unordered_map<String, std::tuple<size_t, size_t>> range_index;
for (const auto & part_with_range : ranges)
{
chassert(part_with_range.ranges.size() == 1);
const auto & range = part_with_range.ranges.at(0);
range_index.emplace(part_with_range.data_part->name, std::make_tuple(range.begin, range.end));
}
RangesInDataParts final;
for (auto & parts_with_range : result.parts_with_ranges)
{
if (!range_index.contains(parts_with_range.data_part->name))
continue;
auto expected_range = range_index.at(parts_with_range.data_part->name);
MarkRanges final_ranges;
for (const auto & range : parts_with_range.ranges)
{
const size_t begin = std::max(range.begin, std::get<0>(expected_range));
const size_t end = std::min(range.end, std::get<1>(expected_range));
// [1, 1) or [5, 2) are invalid.
if (begin >= end)
continue;
MarkRange final_range(begin, end);
final_ranges.emplace_back(final_range);
}
parts_with_range.ranges = final_ranges;
final.emplace_back(parts_with_range);
}
result.parts_with_ranges = final;
source.setAnalyzedResult(std::make_shared<ReadFromMergeTree::AnalysisResult>(result));
}
SparkStorageMergeTree::SparkStorageMergeTree(
const StorageID & table_id_,
const String & relative_data_path_,
const StorageInMemoryMetadata & metadata_,
bool attach,
const ContextMutablePtr & context_,
const String & date_column_name,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> storage_settings_,
bool /*has_force_restore_data_flag*/)
: MergeTreeData(
table_id_,
metadata_,
context_,
date_column_name,
merging_params_,
std::move(storage_settings_),
false, /// require_part_metadata
attach ? LoadingStrictnessLevel::ATTACH : LoadingStrictnessLevel::FORCE_RESTORE)
, reader(*this)
, merger_mutator(*this)
{
relative_data_path = relative_data_path_;
format_version = 1;
}
std::atomic<int> SparkStorageMergeTree::part_num;
void SparkStorageMergeTree::prefetchPartDataFile(const std::unordered_set<std::string> & parts) const
{
prefetchPartFiles(parts, CompactObjectStorageDiskTransaction::PART_DATA_FILE_NAME);
}
void SparkStorageMergeTree::prefetchPartFiles(const std::unordered_set<std::string> & parts, String file_name) const
{
auto disk = getDisks().front();
if (!disk->isRemote())
return;
std::vector<String> data_paths;
std::ranges::for_each(parts, [&](const String & name) { data_paths.emplace_back(fs::path(relative_data_path) / name / file_name); });
auto read_settings = ReadSettings{};
read_settings.remote_fs_method = RemoteFSReadMethod::read;
for (const auto & data_path : data_paths)
{
if (!disk->existsFile(data_path))
continue;
LOG_DEBUG(log, "Prefetching part file {}", data_path);
auto in = disk->readFile(data_path, read_settings);
String ignore_data;
readStringUntilEOF(ignore_data, *in);
}
}
void SparkStorageMergeTree::prefetchMetaDataFile(const std::unordered_set<std::string> & parts) const
{
prefetchPartFiles(parts, CompactObjectStorageDiskTransaction::PART_META_FILE_NAME);
}
std::vector<MergeTreeDataPartPtr> SparkStorageMergeTree::loadDataPartsWithNames(const std::unordered_set<std::string> & parts)
{
Stopwatch watch;
prefetchMetaDataFile(parts);
std::vector<MergeTreeDataPartPtr> data_parts;
const auto disk = getStoragePolicy()->getDisks().at(0);
for (const auto & name : parts)
{
const auto num = part_num.fetch_add(1);
MergeTreePartInfo part_info = {"all", num, num, 0};
auto res = loadDataPart(part_info, name, disk, MergeTreeDataPartState::Active);
data_parts.emplace_back(res.part);
}
watch.stop();
LOG_DEBUG(log, "Loaded data parts ({} items) took {} microseconds", parts.size(), watch.elapsedMicroseconds());
ProfileEvents::increment(ProfileEvents::LoadedDataParts, parts.size());
ProfileEvents::increment(ProfileEvents::LoadedDataPartsMicroseconds, watch.elapsedMicroseconds());
return data_parts;
}
MergeTreeData::LoadPartResult SparkStorageMergeTree::loadDataPart(
const MergeTreePartInfo & part_info, const String & part_name, const DiskPtr & part_disk_ptr, MergeTreeDataPartState to_state)
{
LOG_TRACE(log, "Loading {} part {} from disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
LoadPartResult res;
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, part_disk_ptr, 0);
String part_path = fs::path(relative_data_path) / part_name;
auto data_part_storage = std::make_shared<DataPartStorageOnDiskFull>(single_disk_volume, "", part_path);
try
{
res.part = getDataPartBuilder(part_name, single_disk_volume, part_name, getContext()->getReadSettings())
.withPartInfo(part_info)
.withPartFormatFromDisk()
.build();
}
catch (...)
{
/// Don't count the part as broken if there was a retryalbe error
/// during loading, such as "not enough memory" or network error.
if (isRetryableException(std::current_exception()))
throw;
LOG_DEBUG(log, "Failed to load data part {}, unknown exception", part_name);
return res;
}
try
{
res.part->loadColumnsChecksumsIndexes(require_part_metadata, true);
}
catch (...)
{
/// Don't count the part as broken if there was a retryalbe error
/// during loading, such as "not enough memory" or network error.
if (isRetryableException(std::current_exception()))
throw;
return res;
}
res.part->modification_time = part_disk_ptr->getLastModified(fs::path(relative_data_path) / part_name).epochTime();
res.part->loadVersionMetadata();
res.part->setState(to_state);
auto parts_lock = lockParts();
DataPartIteratorByInfo it;
bool inserted;
{
LOG_TEST(log, "loadDataPart: inserting {} into data_parts_indexes", res.part->getNameWithState());
std::tie(it, inserted) = data_parts_indexes.insert(res.part);
}
/// Remove duplicate parts with the same checksum.
if (!inserted)
{
if ((*it)->checksums.getTotalChecksumHex() == res.part->checksums.getTotalChecksumHex())
{
LOG_ERROR(log, "Remove duplicate part {}", data_part_storage->getFullPath());
res.part->is_duplicate = true;
return res;
}
else
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Part {} already exists but with different checksums", res.part->name);
}
// if (to_state == DataPartState::Active)
// addPartContributionToDataVolume(res.part);
if (res.part->hasLightweightDelete())
has_lightweight_delete_parts.store(true);
// without it "test mergetree optimize partitioned by one low card column" will log ERROR
resetColumnSizes();
calculateColumnAndSecondaryIndexSizesImpl();
LOG_TRACE(log, "Finished loading {} part {} on disk {}", magic_enum::enum_name(to_state), part_name, part_disk_ptr->getName());
return res;
}
void SparkStorageMergeTree::removePartFromMemory(const MergeTreeData::DataPart & part_to_detach)
{
auto lock = lockParts();
bool removed_active_part = false;
bool restored_active_part = false;
auto it_part = data_parts_by_info.find(part_to_detach.info);
if (it_part == data_parts_by_info.end())
{
LOG_DEBUG(log, "No such data part {}", part_to_detach.getNameWithState());
return;
}
/// What if part_to_detach is a reference to *it_part? Make a new owner just in case.
/// Important to own part pointer here (not const reference), because it will be removed from data_parts_indexes
/// few lines below.
DataPartPtr part = *it_part; // NOLINT
if (part->getState() == DataPartState::Active)
{
removePartContributionToColumnAndSecondaryIndexSizes(part);
removed_active_part = true;
}
modifyPartState(it_part, DataPartState::Deleting);
LOG_TEST(log, "removePartFromMemory: removing {} from data_parts_indexes", part->getNameWithState());
data_parts_indexes.erase(it_part);
if (removed_active_part || restored_active_part)
resetObjectColumnsFromActiveParts(lock);
}
void SparkStorageMergeTree::dropPartNoWaitNoThrow(const String & /*part_name*/)
{
throw std::runtime_error("not implement");
}
void SparkStorageMergeTree::dropPart(const String & /*part_name*/, bool /*detach*/, ContextPtr /*context*/)
{
throw std::runtime_error("not implement");
}
void SparkStorageMergeTree::dropPartition(const ASTPtr & /*partition*/, bool /*detach*/, ContextPtr /*context*/)
{
}
PartitionCommandsResultInfo SparkStorageMergeTree::attachPartition(
const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, bool /*part*/, ContextPtr /*context*/)
{
throw std::runtime_error("not implement");
}
void SparkStorageMergeTree::replacePartitionFrom(
const StoragePtr & /*source_table*/, const ASTPtr & /*partition*/, bool /*replace*/, ContextPtr /*context*/)
{
throw std::runtime_error("not implement");
}
void SparkStorageMergeTree::movePartitionToTable(const StoragePtr & /*dest_table*/, const ASTPtr & /*partition*/, ContextPtr /*context*/)
{
throw std::runtime_error("not implement");
}
bool SparkStorageMergeTree::partIsAssignedToBackgroundOperation(const MergeTreeData::DataPartPtr & /*part*/) const
{
throw std::runtime_error("not implement");
}
std::string SparkStorageMergeTree::getName() const
{
throw std::runtime_error("not implement");
}
std::vector<MergeTreeMutationStatus> SparkStorageMergeTree::getMutationsStatus() const
{
throw std::runtime_error("not implement");
}
bool SparkStorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & /*executor*/)
{
throw std::runtime_error("not implement");
}
void SparkStorageMergeTree::startBackgroundMovesIfNeeded()
{
throw std::runtime_error("not implement");
}
std::unique_ptr<MergeTreeSettings> SparkStorageMergeTree::getDefaultSettings() const
{
throw std::runtime_error("not implement");
}
std::map<std::string, MutationCommands> SparkStorageMergeTree::getUnfinishedMutationCommands() const
{
throw std::runtime_error("not implement");
}
MergeTreeTemporaryPartPtr SparkMergeTreeDataWriter::writeTempPart(
BlockWithPartition & block_with_partition,
const StorageMetadataPtr & metadata_snapshot,
const ContextPtr & context,
const std::string & part_dir) const
{
auto temp_part = std::make_unique<MergeTreeTemporaryPart>();
Block & block = block_with_partition.block;
auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames());
for (auto & column : columns)
if (column.type->hasDynamicSubcolumns())
column.type = block.getByName(column.name).type;
auto minmax_idx = std::make_shared<IMergeTreeDataPart::MinMaxIndex>();
minmax_idx->update(block, MergeTreeData::getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
MergeTreePartition partition(block_with_partition.partition);
MergeTreePartInfo new_part_info(partition.getID(metadata_snapshot->getPartitionKey().sample_block), 1, 1, 0);
temp_part->temporary_directory_lock = data.getTemporaryPartDirectoryHolder(part_dir);
auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
/// If we need to calculate some columns to sort.
if (metadata_snapshot->hasSortingKey() || metadata_snapshot->hasSecondaryIndices())
data.getSortingKeyAndSkipIndicesExpression(metadata_snapshot, indices)->execute(block);
Names sort_columns = metadata_snapshot->getSortingKeyColumns();
SortDescription sort_description;
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
/// Sort
IColumn::Permutation * perm_ptr = nullptr;
IColumn::Permutation perm;
if (!sort_description.empty())
{
if (!isAlreadySorted(block, sort_description))
{
stableGetPermutation(block, sort_description, perm);
perm_ptr = &perm;
}
}
Names partition_key_columns = metadata_snapshot->getPartitionKey().column_names;
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
/// If optimize_on_insert is true, block may become empty after merge.
/// There is no need to create empty part.
if (expected_size == 0)
return temp_part;
VolumePtr volume = data.getStoragePolicy()->getVolume(0);
VolumePtr data_part_volume = std::make_shared<SingleDiskVolume>(volume->getName(), volume->getDisk(), volume->max_data_part_size);
auto new_data_part = data.getDataPartBuilder(part_dir, data_part_volume, part_dir, context->getReadSettings())
.withPartFormat(data.choosePartFormat(expected_size, block.rows()))
.withPartInfo(new_part_info)
.build();
auto data_part_storage = new_data_part->getDataPartStoragePtr();
const MergeTreeSettings & data_settings = *data.getSettings();
SerializationInfo::Settings settings{data_settings[MergeTreeSetting::ratio_of_defaults_for_sparse_serialization], true};
SerializationInfoByName infos(columns, settings);
infos.add(block);
new_data_part->setColumns(columns, infos, metadata_snapshot->getMetadataVersion());
new_data_part->rows_count = block.rows();
new_data_part->partition = std::move(partition);
new_data_part->minmax_idx = std::move(minmax_idx);
data_part_storage->beginTransaction();
if (data_settings[MergeTreeSetting::assign_part_uuids])
new_data_part->uuid = UUIDHelpers::generateV4();
SyncGuardPtr sync_guard;
/// The name could be non-unique in case of stale files from previous runs.
String full_path = new_data_part->getDataPartStorage().getFullPath();
if (new_data_part->getDataPartStorage().exists())
{
LOG_WARNING(log, "Removing old temporary directory {}", full_path);
data_part_storage->removeRecursive();
}
data_part_storage->createDirectories();
if ((*data.getSettings())[MergeTreeSetting::fsync_part_directory])
{
const auto disk = data_part_volume->getDisk();
sync_guard = disk->getDirectorySyncGuard(full_path);
}
/// This effectively chooses minimal compression method:
/// either default lz4 or compression method with zero thresholds on absolute and relative part size.
auto compression_codec = data.getContext()->chooseCompressionCodec(0, 0);
auto txn = context->getCurrentTransaction();
auto index_granularity_ptr = createMergeTreeIndexGranularity(
block.rows(),
block.bytes(),
*data.getSettings(),
new_data_part->index_granularity_info,
/*blocks_are_granules=*/false);
auto out = std::make_unique<MergedBlockOutputStream>(
new_data_part,
metadata_snapshot,
columns,
indices,
MergeTreeStatisticsFactory::instance().getMany(metadata_snapshot->getColumns()),
compression_codec,
index_granularity_ptr,
txn ? txn->tid : Tx::PrehistoricTID,
block.bytes(),
false,
false,
context->getWriteSettings());
out->writeWithPermutation(block, perm_ptr);
auto finalizer = out->finalizePartAsync(new_data_part, data_settings[MergeTreeSetting::fsync_after_insert], nullptr, nullptr);
temp_part->part = new_data_part;
temp_part->streams.emplace_back(MergeTreeTemporaryPart::Stream{.stream = std::move(out), .finalizer = std::move(finalizer)});
temp_part->finalize();
data_part_storage->commitTransaction();
return temp_part;
}
std::unique_ptr<MergeTreeSettings>
SparkWriteStorageMergeTree::buildMergeTreeSettings(const ContextMutablePtr & context, const MergeTreeTableSettings & config)
{
//TODO: set settings though ASTStorage
auto settings = std::make_unique<DB::MergeTreeSettings>();
settings->set("allow_nullable_key", Field(true));
if (!config.storage_policy.empty())
settings->set("storage_policy", Field(config.storage_policy));
if (settingsEqual(context->getSettingsRef(), "merge_tree.assign_part_uuids", "true"))
settings->set("assign_part_uuids", Field(true));
if (String min_rows_for_wide_part; tryGetString(context->getSettingsRef(), "merge_tree.min_rows_for_wide_part", min_rows_for_wide_part))
settings->set("min_rows_for_wide_part", Field(std::strtoll(min_rows_for_wide_part.c_str(), nullptr, 10)));
if (settingsEqual(context->getSettingsRef(), "merge_tree.write_marks_for_substreams_in_compact_parts", "true"))
settings->set("write_marks_for_substreams_in_compact_parts", Field(true));
return settings;
}
SinkToStoragePtr SparkWriteStorageMergeTree::write(
const ASTPtr &, const StorageMetadataPtr & /*storage_in_memory_metadata*/, ContextPtr context, bool /*async_insert*/)
{
#ifndef NDEBUG
auto dest_storage = table.getStorage(getContext());
assert(dest_storage.get() == this);
#endif
return SparkMergeTreeSink::create(table, SparkMergeTreeWriteSettings{context}, getContext());
}
}