blob: 0219dd2629bf5554af26a203e717a7a27dd9fa57 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <config.h>
#if USE_ROCKSDB
#include "MetadataStorageFromRocksDBTransactionOperations.h"
#include <ranges>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_STATE;
}
}
namespace local_engine
{
void throwRockDBErrorNotOk(const rocksdb::Status & status)
{
if (!status.ok())
throw DB::Exception(DB::ErrorCodes::INVALID_STATE, "Access rocksdb failed: {}", status.ToString());
}
bool exist(rocksdb::DB & db, const std::string & path)
{
std::string data;
return tryGetData(db, path, &data);
}
bool tryGetData(rocksdb::DB & db, const std::string & path, std::string * value)
{
auto status = db.Get({}, path, value);
if (status.IsNotFound())
return false;
throwRockDBErrorNotOk(status);
return status.ok();
}
String getData(rocksdb::DB & db, const std::string & path)
{
std::string data;
throwRockDBErrorNotOk(db.Get({}, path, &data));
return data;
}
std::vector<String> listKeys(rocksdb::DB & db, const std::string & path)
{
std::vector<String> result;
auto *it = db.NewIterator({});
for (it->Seek(path); it->Valid() && it->key().starts_with(path); it->Next())
{
if (it->key() == path)
continue;
result.push_back(it->key().ToString());
}
return result;
}
void RocksDBWriteFileOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
auto status = db.Get({}, path, &prev_data);
if (status.IsNotFound())
existed = false;
else
throwRockDBErrorNotOk(status);
db.Put({}, path, data);
}
void RocksDBWriteFileOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
if (existed)
throwRockDBErrorNotOk(db.Put({}, path, prev_data));
else
throwRockDBErrorNotOk(db.Delete({}, path));
}
void RocksDBCreateDirectoryOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
existed = exist(db, path);
if (existed)
return;
throwRockDBErrorNotOk(db.Put({}, path, DIR_DATA));
}
void RocksDBCreateDirectoryOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
if (existed) return;
throwRockDBErrorNotOk(db.Delete({}, path));
}
void RocksDBCreateDirectoryRecursiveOperation::execute(std::unique_lock<DB::SharedMutex> & )
{
namespace fs = std::filesystem;
fs::path p(path);
while (!exist(db, p.string()))
{
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
for (const auto & path_to_create : paths_created | std::views::reverse)
throwRockDBErrorNotOk(db.Put({}, path_to_create, RocksDBCreateDirectoryOperation::DIR_DATA));
}
void RocksDBCreateDirectoryRecursiveOperation::undo(std::unique_lock<DB::SharedMutex> & )
{
for (const auto & path_created : paths_created)
throwRockDBErrorNotOk(db.Delete({}, path_created));
}
void RocksDBRemoveDirectoryOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
auto *it = db.NewIterator({});
bool empty_dir = true;
for (it->Seek(path); it->Valid() && it->key().starts_with(path); it->Next())
{
if (it->key() != path && it->key().starts_with(path))
{
empty_dir = false;
break;
}
if (it->key() == path)
existed = true;
}
if (!empty_dir)
{
throw DB::Exception(DB::ErrorCodes::INVALID_STATE, "Directory {} is not empty", path);
}
if (existed)
throwRockDBErrorNotOk(db.Delete({}, path));
}
void RocksDBRemoveDirectoryOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
if (existed)
throwRockDBErrorNotOk(db.Put({}, path, RocksDBCreateDirectoryOperation::DIR_DATA));
}
void RocksDBRemoveRecursiveOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
auto *it = db.NewIterator({});
for (it->Seek(path); it->Valid() && it->key().starts_with(path); it->Next())
{
files.emplace(it->key().ToString(), it->value().ToString());
throwRockDBErrorNotOk(db.Delete({}, it->key()));
}
}
void RocksDBRemoveRecursiveOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
for (const auto & [key, value] : files)
throwRockDBErrorNotOk(db.Put({}, key, value));
}
void RocksDBUnlinkFileOperation::execute(std::unique_lock<DB::SharedMutex> &)
{
prev_data = getData(db, path);
throwRockDBErrorNotOk(db.Delete({}, path));
}
void RocksDBUnlinkFileOperation::undo(std::unique_lock<DB::SharedMutex> &)
{
throwRockDBErrorNotOk(db.Put({}, path, prev_data));
}
}
#endif