feat: implement purge support in DropTable for InMemoryCatalog and SqlCatalog (#744)
When purge=true, delete all metadata files (current + metadata log
entries) before unregistering the table. Old log files are deleted first
so the current metadata file remains as an anchor for retries.
diff --git a/src/iceberg/catalog/memory/in_memory_catalog.cc b/src/iceberg/catalog/memory/in_memory_catalog.cc
index 35593ed..6148ef4 100644
--- a/src/iceberg/catalog/memory/in_memory_catalog.cc
+++ b/src/iceberg/catalog/memory/in_memory_catalog.cc
@@ -22,6 +22,7 @@
#include <algorithm>
#include <iterator>
+#include "iceberg/file_io.h"
#include "iceberg/table.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
@@ -511,7 +512,22 @@
Status InMemoryCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
std::unique_lock lock(mutex_);
- // TODO(Guotao): Delete all metadata files if purge is true.
+ if (purge && file_io_) {
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata_location,
+ root_namespace_->GetTableMetadataLocation(identifier));
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata,
+ TableMetadataUtil::Read(*file_io_, metadata_location));
+ // Delete previous metadata files from the log first, so that if deletion
+ // fails and is retried, the current metadata file still exists as an
+ // anchor to locate any remaining old files.
+ std::vector<std::string> files_to_delete;
+ files_to_delete.reserve(metadata->metadata_log.size());
+ for (const auto& entry : metadata->metadata_log) {
+ files_to_delete.push_back(entry.metadata_file);
+ }
+ std::ignore = file_io_->DeleteFiles(files_to_delete);
+ std::ignore = file_io_->DeleteFile(metadata_location);
+ }
return root_namespace_->UnregisterTable(identifier);
}
diff --git a/src/iceberg/catalog/sql/sql_catalog.cc b/src/iceberg/catalog/sql/sql_catalog.cc
index eb066ba..cfe155f 100644
--- a/src/iceberg/catalog/sql/sql_catalog.cc
+++ b/src/iceberg/catalog/sql/sql_catalog.cc
@@ -24,6 +24,7 @@
#include <string>
#include "iceberg/catalog/sql/config.h"
+#include "iceberg/file_io.h"
#include "iceberg/table.h"
#include "iceberg/table_identifier.h"
#include "iceberg/table_metadata.h"
@@ -507,11 +508,25 @@
Status SqlCatalog::DropTable(const TableIdentifier& identifier, bool purge) {
ICEBERG_RETURN_UNEXPECTED(ValidateTableIdentifier(identifier));
- if (purge) {
- // TODO(zhjwpku): Delete the table data and metadata files when purge is requested.
- }
const std::string ns_str = NamespaceToString(identifier.ns);
+
+ if (purge && file_io_) {
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata_location,
+ store_->GetTableMetadataLocation(ns_str, identifier.name));
+ if (metadata_location.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata,
+ TableMetadataUtil::Read(*file_io_, *metadata_location));
+ // Delete previous metadata files from the log first, so that if deletion
+ // fails and is retried, the current metadata file still exists as an
+ // anchor to locate any remaining old files.
+ for (const auto& entry : metadata->metadata_log) {
+ std::ignore = file_io_->DeleteFile(entry.metadata_file);
+ }
+ std::ignore = file_io_->DeleteFile(*metadata_location);
+ }
+ }
+
ICEBERG_ASSIGN_OR_RAISE(auto affected, store_->DeleteTable(ns_str, identifier.name));
if (affected == 0) {
return NoSuchTable("Table does not exist: {}", identifier.ToString());