blob: 8e24c271d1e7e5768019396bd804d8df2f5afc6e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "SparkMergeTreeSink.h"
#include <Core/Settings.h>
#include <Disks/IDiskTransaction.h>
#include <IO/copyData.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Storages/MergeTree/MetaDataHelper.h>
#include <Common/QueryContext.h>
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
extern const Metric LocalThreadScheduled;
extern const Metric GlobalThread;
extern const Metric GlobalThreadActive;
extern const Metric GlobalThreadScheduled;
}
namespace DB::Setting
{
extern const SettingsUInt64 min_insert_block_size_rows;
extern const SettingsUInt64 min_insert_block_size_bytes;
}
namespace local_engine
{
using namespace DB;
void SparkMergeTreeSink::write(const Chunk & chunk)
{
CurrentThread::flushUntrackedMemory();
{
PartWithStats part_with_stats{
.data_part = nullptr,
.delta_stats = empty_delta_stats_
.transform(
[&](const auto & stats)
{
auto newStats = std::make_shared<DeltaStats>(stats);
newStats->update(chunk);
return newStats;
})
.value_or(nullptr)};
/// Reset earlier, so put it in the scope
BlockWithPartition item{getHeader().cloneWithColumns(chunk.getColumns()), Row{}};
sink_helper->writeTempPart(item, std::move(part_with_stats), context, part_num);
part_num++;
}
}
void SparkMergeTreeSink::consume(Chunk & chunk)
{
Chunk tmp;
tmp.swap(chunk);
squashed_chunk = squashing.add(std::move(tmp));
if (static_cast<bool>(squashed_chunk))
{
write(Squashing::squash(std::move(squashed_chunk)));
sink_helper->checkAndMerge();
}
assert(squashed_chunk.getNumRows() == 0);
assert(chunk.getNumRows() == 0);
}
void SparkMergeTreeSink::onStart()
{
// DO NOTHING
}
void SparkMergeTreeSink::onFinish()
{
assert(squashed_chunk.getNumRows() == 0);
squashed_chunk = squashing.flush();
if (static_cast<bool>(squashed_chunk))
write(Squashing::squash(std::move(squashed_chunk)));
assert(squashed_chunk.getNumRows() == 0);
sink_helper->finish(context);
if (stats_.has_value())
(*stats_)->collectStats(sink_helper->unsafeGet(), sink_helper->write_settings.partition_settings.partition_dir);
}
/////
SinkToStoragePtr SparkMergeTreeSink::create(
const MergeTreeTable & merge_tree_table,
const SparkMergeTreeWriteSettings & write_settings_,
const DB::ContextMutablePtr & context,
const DeltaStatsOption & delta_stats,
const SinkStatsOption & stats)
{
if (write_settings_.partition_settings.part_name_prefix.empty())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "empty part_name_prefix is not allowed.");
auto dest_storage = merge_tree_table.getStorage(context);
bool isRemoteStorage = dest_storage->getStoragePolicy()->getAnyDisk()->isRemote();
bool insert_with_local_storage = !write_settings_.insert_without_local_storage;
SinkHelperPtr sink_helper;
if (insert_with_local_storage && isRemoteStorage)
{
auto temp = merge_tree_table.copyToDefaultPolicyStorage(context);
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Create temp table {} for local merge.",
temp->getStorageID().getFullNameNotQuoted());
sink_helper = std::make_shared<CopyToRemoteSinkHelper>(temp, dest_storage, write_settings_);
}
else
sink_helper = std::make_shared<DirectSinkHelper>(dest_storage, write_settings_, isRemoteStorage);
const DB::Settings & settings = context->getSettingsRef();
return std::make_shared<SparkMergeTreeSink>(
sink_helper,
std::make_shared<const DB::Block>(sink_helper->metadata_snapshot->getSampleBlock()),
context,
delta_stats,
stats,
settings[Setting::min_insert_block_size_rows],
settings[Setting::min_insert_block_size_bytes]);
}
SinkHelper::SinkHelper(const SparkStorageMergeTreePtr & data_, const SparkMergeTreeWriteSettings & write_settings_, bool isRemoteStorage_)
: data(data_)
, isRemoteStorage(isRemoteStorage_)
, thread_pool(CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive, CurrentMetrics::LocalThreadScheduled, 1, 1, 100000)
, write_settings(write_settings_)
, metadata_snapshot(data->getInMemoryMetadataPtr())
{
}
void SinkHelper::saveMetadata(const DB::ContextPtr & context)
{
if (!isRemoteStorage)
return;
const std::deque<PartWithStats> & parts = new_parts.unsafeGet();
for (const auto & merge_tree_data_part : parts)
{
auto part = dest_storage().loadDataPartsWithNames({merge_tree_data_part.data_part->name});
if (part.empty())
{
LOG_WARNING(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Save metadata failed because dest storage load part name {} empty.",
merge_tree_data_part.data_part->name);
continue;
}
saveFileStatus(
dest_storage(),
context,
merge_tree_data_part.data_part->name,
const_cast<IDataPartStorage &>(part.at(0)->getDataPartStorage()));
}
}
void SinkHelper::doMergePartsAsync(const std::vector<PartWithStats> & merge_parts_with_stats)
{
for (const auto & selected_part : merge_parts_with_stats)
tmp_parts.emplace(selected_part.data_part->name);
// check a thread group initialized in task thread
currentThreadGroupMemoryUsage();
thread_pool.scheduleOrThrow(
[this, merge_parts_with_stats, thread_group = CurrentThread::getGroup()]() -> void
{
ThreadGroupSwitcher switcher(thread_group, "AsyncMerge");
Stopwatch watch;
size_t before_size = 0;
std::vector<DB::DataPartPtr> prepare_merge_parts_;
for (const auto & prepare_merge_part : merge_parts_with_stats)
{
before_size += prepare_merge_part.data_part->getBytesOnDisk();
prepare_merge_parts_.emplace_back(prepare_merge_part.data_part);
}
const auto merged_part = mergeParts(
prepare_merge_parts_,
toString(UUIDHelpers::generateV4()),
dataRef(),
write_settings.partition_settings.partition_dir,
write_settings.partition_settings.bucket_dir);
size_t after_size = merged_part->getBytesOnDisk();
if (std::ranges::any_of(merge_parts_with_stats, [](const auto & part) { return part.delta_stats == nullptr; }))
{
// no stats
new_parts.emplace_back(PartWithStats{std::move(merged_part), nullptr});
}
else
{
auto merge_stats = merge_parts_with_stats.begin()->delta_stats;
for (auto begin = merge_parts_with_stats.begin() + 1; begin != merge_parts_with_stats.end(); ++begin)
merge_stats->merge(*begin->delta_stats);
new_parts.emplace_back(PartWithStats{std::move(merged_part), std::move(merge_stats)});
}
watch.stop();
LOG_INFO(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Merge success. Before merge part size {}, part count {}, after part size {}, part count {}, "
"total elapsed {} ms",
before_size, // before size
merge_parts_with_stats.size(), // before part count
after_size, // after size
1, // after part count
watch.elapsedMilliseconds());
});
}
void SinkHelper::writeTempPart(
DB::BlockWithPartition & block_with_partition, PartWithStats part_with_stats, const ContextPtr & context, int part_num)
{
assert(!metadata_snapshot->hasPartitionKey());
const std::string & part_name_prefix = write_settings.partition_settings.part_name_prefix;
std::string part_dir;
if (write_settings.is_optimize_task)
part_dir = fmt::format("{}-merged", part_name_prefix);
else
part_dir = fmt::format("{}_{:03d}", part_name_prefix, part_num);
const auto tmp = dataRef().getWriter().writeTempPart(block_with_partition, metadata_snapshot, context, part_dir);
part_with_stats.data_part = tmp->part;
new_parts.emplace_back(std::move(part_with_stats));
}
void SinkHelper::checkAndMerge(bool force)
{
if (!write_settings.merge_after_insert)
return;
// Only finalize should force merge.
if (!force && new_parts.size() < write_settings.merge_limit_parts)
return;
std::vector<PartWithStats> selected_parts;
selected_parts.reserve(write_settings.merge_limit_parts);
size_t total_size = 0;
std::vector<PartWithStats> skip_parts;
while (const auto merge_tree_data_part_option = new_parts.pop_front())
{
auto merge_tree_data_part = merge_tree_data_part_option.value();
if (merge_tree_data_part.data_part->getBytesOnDisk() >= write_settings.merge_min_size)
{
skip_parts.emplace_back(merge_tree_data_part);
continue;
}
selected_parts.emplace_back(merge_tree_data_part);
total_size += merge_tree_data_part.data_part->getBytesOnDisk();
if (write_settings.merge_min_size > total_size && write_settings.merge_limit_parts > selected_parts.size())
continue;
doMergePartsAsync(selected_parts);
selected_parts.clear();
total_size = 0;
}
if (!selected_parts.empty())
{
if (force && selected_parts.size() > 1)
doMergePartsAsync(selected_parts);
else
new_parts.emplace_back(selected_parts);
}
new_parts.emplace_back(skip_parts);
}
void SinkHelper::finish(const DB::ContextPtr & context)
{
if (write_settings.merge_after_insert)
finalizeMerge();
commit(context->getReadSettings(), context->getWriteSettings());
saveMetadata(context);
}
void SinkHelper::finalizeMerge()
{
LOG_DEBUG(&Poco::Logger::get("SparkMergeTreeWriter"), "Waiting all merge task end and do final merge");
// waiting all merge task end and do final merge
thread_pool.wait();
size_t before_merge_size;
do
{
before_merge_size = new_parts.size();
checkAndMerge(true);
thread_pool.wait();
} while (before_merge_size != new_parts.size());
cleanup();
}
void CopyToRemoteSinkHelper::commit(const ReadSettings & read_settings, const WriteSettings & write_settings)
{
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"), "Begin upload to disk {}.", dest_storage().getStoragePolicy()->getAnyDisk()->getName());
const std::deque<PartWithStats> & parts = new_parts.unsafeGet();
Stopwatch watch;
for (const auto & merge_tree_data_part : parts)
{
String local_relative_path = dataRef().getRelativeDataPath() + "/" + merge_tree_data_part.data_part->name;
String remote_relative_path = dest_storage().getRelativeDataPath() + "/" + merge_tree_data_part.data_part->name;
std::vector<String> files;
dataRef().getStoragePolicy()->getAnyDisk()->listFiles(local_relative_path, files);
auto src_disk = dataRef().getStoragePolicy()->getAnyDisk();
auto dest_disk = dest_storage().getStoragePolicy()->getAnyDisk();
auto tx = dest_disk->createTransaction();
for (const auto & file : files)
{
auto read_buffer = src_disk->readFile(local_relative_path + "/" + file, read_settings);
auto write_buffer
= tx->writeFile(remote_relative_path + "/" + file, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, write_settings);
copyData(*read_buffer, *write_buffer);
write_buffer->finalize();
}
tx->commit();
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload part {} to disk {} success.",
merge_tree_data_part.data_part->name,
dest_storage().getStoragePolicy()->getAnyDisk()->getName());
}
watch.stop();
LOG_INFO(
&Poco::Logger::get("SparkMergeTreeWriter"),
"Upload to disk {} finished, total elapsed {} ms",
dest_storage().getStoragePolicy()->getAnyDisk()->getName(),
watch.elapsedMilliseconds());
StorageMergeTreeFactory::freeStorage(temp_storage()->getStorageID());
temp_storage()->dropAllData();
LOG_DEBUG(
&Poco::Logger::get("SparkMergeTreeWriter"), "Clean temp table {} success.", temp_storage()->getStorageID().getFullNameNotQuoted());
}
void DirectSinkHelper::cleanup()
{
// default storage need clean temp.
std::unordered_set<String> final_parts;
for (const auto & merge_tree_data_part : new_parts.unsafeGet())
final_parts.emplace(merge_tree_data_part.data_part->name);
for (const auto & tmp_part : tmp_parts)
{
if (final_parts.contains(tmp_part))
continue;
GlobalThreadPool::instance().scheduleOrThrow(
[storage_ = data, tmp = tmp_part]() -> void
{
for (const auto & disk : storage_->getDisks())
{
auto rel_path = storage_->getRelativeDataPath() + "/" + tmp;
disk->removeRecursive(rel_path);
}
});
}
}
}