blob: b1b9c596839c823642b6fce1f1024d315e4b614a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "CacheManager.h"
#include <ranges>
#include <Core/Settings.h>
#include <Disks/IStoragePolicy.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Interpreters/Context.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MetaDataHelper.h>
#include <jni/jni_common.h>
#include <Common/Logger.h>
#include <Common/ThreadPool.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace Setting
{
extern const SettingsUInt64 max_block_size;
}
namespace ErrorCodes
{
extern const int INVALID_STATE;
}
}
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
extern const Metric LocalThreadScheduled;
}
namespace local_engine
{
using namespace DB;
jclass CacheManager::cache_result_class = nullptr;
jmethodID CacheManager::cache_result_constructor = nullptr;
void CacheManager::initJNI(JNIEnv * env)
{
cache_result_class = CreateGlobalClassReference(env, "Lorg/apache/gluten/execution/CacheResult;");
cache_result_constructor = GetMethodID(env, cache_result_class, "<init>", "(ILjava/lang/String;)V");
}
CacheManager & CacheManager::instance()
{
static CacheManager cache_manager;
return cache_manager;
}
void CacheManager::initialize(const DB::ContextMutablePtr & context_)
{
auto & manager = instance();
manager.context = context_;
}
struct CacheJobContext
{
MergeTreeTableInstance table;
};
Task CacheManager::cachePart(
const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set<String> & columns, bool only_meta_cache)
{
CacheJobContext job_context{table};
job_context.table.parts.clear();
job_context.table.parts.push_back(part);
job_context.table.snapshot_id = "";
MergeTreeCacheConfig config = MergeTreeCacheConfig::loadFromContext(context);
Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache,
prefetch_data = config.enable_data_prefetch]()
{
try
{
auto storage = job_detail.table.restoreStorage(context);
std::vector<DataPartPtr> selected_parts
= StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name});
if (only_meta_cache)
{
LOG_INFO(
getLogger("CacheManager"),
"Load meta cache of table {}.{} part {} success.",
job_detail.table.database,
job_detail.table.table,
job_detail.table.parts.front().name);
return;
}
// prefetch part data
if (prefetch_data)
storage->prefetchPartDataFile({job_detail.table.parts.front().name});
auto storage_snapshot = std::make_shared<StorageSnapshot>(*storage, storage->getInMemoryMetadataPtr());
NamesAndTypesList names_and_types_list;
auto meta_columns = storage->getInMemoryMetadata().getColumns();
for (const auto & column : meta_columns)
{
if (read_columns.contains(column.name))
names_and_types_list.push_back(NameAndTypePair(column.name, column.type));
}
auto query_info = buildQueryInfo(names_and_types_list);
auto read_step = storage->reader.readFromParts(
RangesInDataParts({selected_parts}),
storage->getMutationsSnapshot({}),
names_and_types_list.getNames(),
storage_snapshot,
*query_info,
context,
context->getSettingsRef()[Setting::max_block_size],
1);
QueryPlan plan;
plan.addStep(std::move(read_step));
DB::QueryPlanOptimizationSettings optimization_settings{context};
DB::BuildQueryPipelineSettings build_settings{context};
auto pipeline_builder = plan.buildQueryPipeline(optimization_settings, build_settings);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*pipeline_builder.get()));
PullingPipelineExecutor executor(pipeline);
while (true)
{
if (Chunk chunk; !executor.pull(chunk))
break;
}
LOG_INFO(getLogger("CacheManager"), "Load cache of table {}.{} part {} success.", job_detail.table.database, job_detail.table.table, job_detail.table.parts.front().name);
}
catch (std::exception& e)
{
LOG_ERROR(getLogger("CacheManager"), "Load cache of table {}.{} part {} failed.\n {}", job_detail.table.database, job_detail.table.table, job_detail.table.parts.front().name, e.what());
std::rethrow_exception(std::current_exception());
}
};
LOG_INFO(getLogger("CacheManager"), "Loading cache of table {}.{} part {}", job_context.table.database, job_context.table.table, job_context.table.parts.front().name);
return std::move(task);
}
JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set<String>& columns, bool only_meta_cache)
{
JobId id = toString(UUIDHelpers::generateV4());
Job job(id);
for (const auto & part : table.parts)
{
job.addTask(cachePart(table, part, columns, only_meta_cache));
}
auto& scheduler = JobScheduler::instance();
scheduler.scheduleJob(std::move(job));
return id;
}
jobject CacheManager::getCacheStatus(JNIEnv * env, const String & jobId)
{
auto & scheduler = JobScheduler::instance();
auto job_status = scheduler.getJobSatus(jobId);
int status = 0;
String message;
if (job_status.has_value())
{
switch (job_status.value().status)
{
case JobSatus::RUNNING:
status = 0;
break;
case JobSatus::FINISHED:
status = 1;
break;
case JobSatus::FAILED:
status = 2;
for (const auto & msg : job_status->messages)
{
message.append(msg);
message.append(";");
}
break;
}
}
else
{
status = 2;
message = fmt::format("job {} not found", jobId);
}
return env->NewObject(cache_result_class, cache_result_constructor, status, charTojstring(env, message.c_str()));
}
Task CacheManager::cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder)
{
auto task = [file, read_buffer_builder, context = this->context]()
{
LOG_INFO(getLogger("CacheManager"), "Loading cache file {}", file.uri_file());
try
{
std::unique_ptr<DB::ReadBuffer> rb = read_buffer_builder->build(file);
while (!rb->eof())
rb->ignoreAll();
}
catch (std::exception & e)
{
LOG_ERROR(getLogger("CacheManager"), "Load cache file {} failed.\n {}", file.uri_file(), e.what());
std::rethrow_exception(std::current_exception());
}
};
return std::move(task);
}
JobId CacheManager::cacheFiles(substrait::ReadRel::LocalFiles file_infos)
{
JobId id = toString(UUIDHelpers::generateV4());
Job job(id);
DB::ReadSettings read_settings = context->getReadSettings();
if (file_infos.items_size())
{
const Poco::URI file_uri(file_infos.items().Get(0).uri_file());
const auto read_buffer_builder = ReadBufferBuilderFactory::instance().createBuilder(file_uri.getScheme(), context);
if (context->getConfigRef().getBool(GlutenCacheConfig::ENABLED, false))
for (const auto & file : file_infos.items())
job.addTask(cacheFile(file, read_buffer_builder));
else
LOG_WARNING(getLogger("CacheManager"), "Load cache skipped because cache not enabled.");
}
auto & scheduler = JobScheduler::instance();
scheduler.scheduleJob(std::move(job));
return id;
}
void CacheManager::removeFiles(String file, String cache_name)
{
// only for ut
for (const auto & [name, file_cache] : FileCacheFactory::instance().getAll())
{
if (name != cache_name)
continue;
if (const auto cache = file_cache->cache)
cache->removePathIfExists(file, DB::FileCache::getCommonUser().user_id);
}
}
}