blob: c9ae424d4d4fe42d3e87e5611c754ad0c6298b59 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "vec/sink/delta_writer_v2_pool.h"
#include "olap/delta_writer_v2.h"
#include "util/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
class TExpr;
namespace vectorized {
DeltaWriterV2Map::DeltaWriterV2Map(UniqueId load_id, int num_use, DeltaWriterV2Pool* pool)
: _load_id(load_id), _use_cnt(num_use), _pool(pool) {}
DeltaWriterV2Map::~DeltaWriterV2Map() = default;
std::shared_ptr<DeltaWriterV2> DeltaWriterV2Map::get_or_create(
int64_t tablet_id, std::function<std::unique_ptr<DeltaWriterV2>()> creator) {
std::lock_guard lock(_mutex);
if (_map.contains(tablet_id)) {
return _map.at(tablet_id);
}
std::shared_ptr<DeltaWriterV2> writer = creator();
if (writer != nullptr) {
_map[tablet_id] = writer;
}
return writer;
}
Status DeltaWriterV2Map::close(std::unordered_map<int64_t, int32_t>& segments_for_tablet,
RuntimeProfile* profile) {
int num_use = --_use_cnt;
if (num_use > 0) {
LOG(INFO) << "keeping DeltaWriterV2Map, load_id=" << _load_id << " , use_cnt=" << num_use;
return Status::OK();
}
if (_pool != nullptr) {
_pool->erase(_load_id);
}
LOG(INFO) << "closing DeltaWriterV2Map, load_id=" << _load_id;
std::lock_guard lock(_mutex);
for (auto& [_, writer] : _map) {
RETURN_IF_ERROR(writer->close());
}
LOG(INFO) << "close-waiting DeltaWriterV2Map, load_id=" << _load_id;
for (auto& [tablet_id, writer] : _map) {
int32_t num_segments;
RETURN_IF_ERROR(writer->close_wait(num_segments, profile));
segments_for_tablet[tablet_id] = num_segments;
}
return Status::OK();
}
void DeltaWriterV2Map::cancel(Status status) {
int num_use = --_use_cnt;
LOG(INFO) << "cancelling DeltaWriterV2Map " << _load_id << ", use_cnt=" << num_use;
if (num_use == 0 && _pool != nullptr) {
_pool->erase(_load_id);
}
std::lock_guard lock(_mutex);
for (auto& [_, writer] : _map) {
static_cast<void>(writer->cancel_with_status(status));
}
}
DeltaWriterV2Pool::DeltaWriterV2Pool() = default;
DeltaWriterV2Pool::~DeltaWriterV2Pool() = default;
std::shared_ptr<DeltaWriterV2Map> DeltaWriterV2Pool::get_or_create(PUniqueId load_id,
int num_sink) {
UniqueId id {load_id};
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<DeltaWriterV2Map> map = _pool[id];
if (map) {
return map;
}
map = std::make_shared<DeltaWriterV2Map>(id, num_sink, this);
_pool[id] = map;
return map;
}
void DeltaWriterV2Pool::erase(UniqueId load_id) {
std::lock_guard<std::mutex> lock(_mutex);
LOG(INFO) << "erasing DeltaWriterV2Map, load_id=" << load_id;
_pool.erase(load_id);
}
} // namespace vectorized
} // namespace doris