blob: 3402bb334680bfb3635e8ccab6b441674f5dfd3b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "recycler/obj_storage_client.h"
#include <chrono>
#include "cpp/sync_point.h"
#include "recycler/sync_executor.h"
using namespace std::chrono;
namespace doris::cloud {
ObjectStorageResponse ObjStorageClient::delete_objects_recursively_(ObjectStoragePathRef path,
const ObjClientOptions& option,
int64_t expired_time,
size_t batch_size) {
TEST_SYNC_POINT_CALLBACK("ObjStorageClient::delete_objects_recursively_", &batch_size);
size_t num_deleted_objects = 0;
auto start_time = steady_clock::now();
auto list_iter = list_objects(path);
ObjectStorageResponse ret;
std::vector<std::string> keys;
SyncExecutor<int> concurrent_delete_executor(
option.executor,
fmt::format("delete objects under bucket {}, path {}", path.bucket, path.key),
[](const int& ret) { return ret != 0; });
for (auto obj = list_iter->next(); obj.has_value(); obj = list_iter->next()) {
if (expired_time > 0 && obj->mtime_s > expired_time) {
continue;
}
num_deleted_objects++;
keys.emplace_back(std::move(obj->key));
if (keys.size() < batch_size) {
continue;
}
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}
if (!list_iter->is_valid()) {
bool finished;
concurrent_delete_executor.when_all(&finished);
return {-1};
}
if (!keys.empty()) {
concurrent_delete_executor.add([this, &path, k = std::move(keys), option]() mutable {
return delete_objects(path.bucket, std::move(k), option).ret;
});
}
bool finished = true;
std::vector<int> rets = concurrent_delete_executor.when_all(&finished);
for (int r : rets) {
if (r != 0) {
ret = -1;
}
}
auto elapsed = duration_cast<milliseconds>(steady_clock::now() - start_time).count();
LOG(INFO) << "delete objects under " << path.bucket << "/" << path.key
<< " finished, ret=" << ret.ret << ", finished=" << finished
<< ", num_deleted_objects=" << num_deleted_objects << ", cost=" << elapsed << " ms";
ret = finished ? ret : -1;
return ret;
}
} // namespace doris::cloud