blob: a880f94f4385aeef9f7fbb59783e00b2f69eb009 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.inmemory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.CloseableGroup;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Objects;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.view.BaseMetastoreViewCatalog;
import org.apache.iceberg.view.BaseViewOperations;
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewUtil;
/**
* Catalog implementation that uses in-memory data-structures to store the namespaces and tables.
* This class doesn't touch external resources and can be utilized to write unit tests without side
* effects. It uses {@link InMemoryFileIO}.
*/
public class InMemoryCatalog extends BaseMetastoreViewCatalog
implements SupportsNamespaces, Closeable {
private static final Joiner SLASH = Joiner.on("/");
private static final Joiner DOT = Joiner.on(".");
private final ConcurrentMap<Namespace, Map<String, String>> namespaces;
private final ConcurrentMap<TableIdentifier, String> tables;
private final ConcurrentMap<TableIdentifier, String> views;
private FileIO io;
private String catalogName;
private String warehouseLocation;
private CloseableGroup closeableGroup;
public InMemoryCatalog() {
this.namespaces = Maps.newConcurrentMap();
this.tables = Maps.newConcurrentMap();
this.views = Maps.newConcurrentMap();
}
@Override
public String name() {
return catalogName;
}
@Override
public void initialize(String name, Map<String, String> properties) {
this.catalogName = name != null ? name : InMemoryCatalog.class.getSimpleName();
String warehouse = properties.getOrDefault(CatalogProperties.WAREHOUSE_LOCATION, "");
this.warehouseLocation = warehouse.replaceAll("/*$", "");
this.io = new InMemoryFileIO();
this.closeableGroup = new CloseableGroup();
closeableGroup.addCloseable(metricsReporter());
closeableGroup.setSuppressCloseFailure(true);
}
@Override
protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
return new InMemoryTableOperations(io, tableIdentifier);
}
@Override
protected String defaultWarehouseLocation(TableIdentifier tableIdentifier) {
return SLASH.join(
defaultNamespaceLocation(tableIdentifier.namespace()), tableIdentifier.name());
}
private String defaultNamespaceLocation(Namespace namespace) {
if (namespace.isEmpty()) {
return warehouseLocation;
} else {
return SLASH.join(warehouseLocation, SLASH.join(namespace.levels()));
}
}
@Override
public boolean dropTable(TableIdentifier tableIdentifier, boolean purge) {
TableOperations ops = newTableOps(tableIdentifier);
TableMetadata lastMetadata;
if (purge && ops.current() != null) {
lastMetadata = ops.current();
} else {
lastMetadata = null;
}
synchronized (this) {
if (null == tables.remove(tableIdentifier)) {
return false;
}
}
if (purge && lastMetadata != null) {
CatalogUtil.dropTableData(ops.io(), lastMetadata);
}
return true;
}
@Override
public List<TableIdentifier> listTables(Namespace namespace) {
if (!namespaceExists(namespace) && !namespace.isEmpty()) {
throw new NoSuchNamespaceException(
"Cannot list tables for namespace. Namespace does not exist: %s", namespace);
}
return tables.keySet().stream()
.filter(t -> namespace.isEmpty() || t.namespace().equals(namespace))
.sorted(Comparator.comparing(TableIdentifier::toString))
.collect(Collectors.toList());
}
@Override
public void renameTable(TableIdentifier from, TableIdentifier to) {
if (from.equals(to)) {
return;
}
synchronized (this) {
if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException(
"Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
}
String fromLocation = tables.get(from);
if (null == fromLocation) {
throw new NoSuchTableException("Cannot rename %s to %s. Table does not exist", from, to);
}
if (tables.containsKey(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
}
if (views.containsKey(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
}
tables.put(to, fromLocation);
tables.remove(from);
}
}
@Override
public void createNamespace(Namespace namespace) {
createNamespace(namespace, Collections.emptyMap());
}
@Override
public void createNamespace(Namespace namespace, Map<String, String> metadata) {
synchronized (this) {
if (namespaceExists(namespace)) {
throw new AlreadyExistsException(
"Cannot create namespace %s. Namespace already exists", namespace);
}
namespaces.put(namespace, ImmutableMap.copyOf(metadata));
}
}
@Override
public boolean namespaceExists(Namespace namespace) {
return namespaces.containsKey(namespace);
}
@Override
public boolean dropNamespace(Namespace namespace) throws NamespaceNotEmptyException {
synchronized (this) {
if (!namespaceExists(namespace)) {
return false;
}
List<TableIdentifier> tableIdentifiers = listTables(namespace);
if (!tableIdentifiers.isEmpty()) {
throw new NamespaceNotEmptyException(
"Namespace %s is not empty. Contains %d table(s).", namespace, tableIdentifiers.size());
}
return namespaces.remove(namespace) != null;
}
}
@Override
public boolean setProperties(Namespace namespace, Map<String, String> properties)
throws NoSuchNamespaceException {
synchronized (this) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
namespaces.computeIfPresent(
namespace,
(k, v) ->
ImmutableMap.<String, String>builder()
.putAll(v)
.putAll(properties)
.buildKeepingLast());
return true;
}
}
@Override
public boolean removeProperties(Namespace namespace, Set<String> properties)
throws NoSuchNamespaceException {
synchronized (this) {
if (!namespaceExists(namespace)) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
namespaces.computeIfPresent(
namespace,
(k, v) -> {
Map<String, String> newProperties = Maps.newHashMap(v);
properties.forEach(newProperties::remove);
return ImmutableMap.copyOf(newProperties);
});
return true;
}
}
@Override
public Map<String, String> loadNamespaceMetadata(Namespace namespace)
throws NoSuchNamespaceException {
Map<String, String> properties = namespaces.get(namespace);
if (properties == null) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
return ImmutableMap.copyOf(properties);
}
@Override
public List<Namespace> listNamespaces() {
return namespaces.keySet().stream()
.filter(n -> !n.isEmpty())
.map(n -> n.level(0))
.distinct()
.sorted()
.map(Namespace::of)
.collect(Collectors.toList());
}
@Override
public List<Namespace> listNamespaces(Namespace namespace) throws NoSuchNamespaceException {
final String searchNamespaceString =
namespace.isEmpty() ? "" : DOT.join(namespace.levels()) + ".";
final int searchNumberOfLevels = namespace.levels().length;
List<Namespace> filteredNamespaces =
namespaces.keySet().stream()
.filter(n -> DOT.join(n.levels()).startsWith(searchNamespaceString))
.collect(Collectors.toList());
// If the namespace does not exist and the namespace is not a prefix of another namespace,
// throw an exception.
if (!namespaces.containsKey(namespace) && filteredNamespaces.isEmpty()) {
throw new NoSuchNamespaceException("Namespace does not exist: %s", namespace);
}
return filteredNamespaces.stream()
// List only the child-namespaces roots.
.map(n -> Namespace.of(Arrays.copyOf(n.levels(), searchNumberOfLevels + 1)))
.distinct()
.sorted(Comparator.comparing(n -> DOT.join(n.levels())))
.collect(Collectors.toList());
}
@Override
public void close() throws IOException {
closeableGroup.close();
namespaces.clear();
tables.clear();
views.clear();
}
@Override
public List<TableIdentifier> listViews(Namespace namespace) {
if (!namespaceExists(namespace) && !namespace.isEmpty()) {
throw new NoSuchNamespaceException(
"Cannot list views for namespace. Namespace does not exist: %s", namespace);
}
return views.keySet().stream()
.filter(v -> namespace.isEmpty() || v.namespace().equals(namespace))
.sorted(Comparator.comparing(TableIdentifier::toString))
.collect(Collectors.toList());
}
@Override
protected InMemoryViewOperations newViewOps(TableIdentifier identifier) {
return new InMemoryViewOperations(io, identifier);
}
@Override
public boolean dropView(TableIdentifier identifier) {
synchronized (this) {
return null != views.remove(identifier);
}
}
@Override
public void renameView(TableIdentifier from, TableIdentifier to) {
if (from.equals(to)) {
return;
}
synchronized (this) {
if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException(
"Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
}
String fromViewLocation = views.get(from);
if (null == fromViewLocation) {
throw new NoSuchViewException("Cannot rename %s to %s. View does not exist", from, to);
}
if (tables.containsKey(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. Table already exists", from, to);
}
if (views.containsKey(to)) {
throw new AlreadyExistsException("Cannot rename %s to %s. View already exists", from, to);
}
views.put(to, fromViewLocation);
views.remove(from);
}
}
private class InMemoryTableOperations extends BaseMetastoreTableOperations {
private final FileIO fileIO;
private final TableIdentifier tableIdentifier;
private final String fullTableName;
InMemoryTableOperations(FileIO fileIO, TableIdentifier tableIdentifier) {
this.fileIO = fileIO;
this.tableIdentifier = tableIdentifier;
this.fullTableName = fullTableName(catalogName, tableIdentifier);
}
@Override
public void doRefresh() {
String latestLocation = tables.get(tableIdentifier);
if (latestLocation == null) {
disableRefresh();
} else {
refreshFromMetadataLocation(latestLocation);
}
}
@Override
public void doCommit(TableMetadata base, TableMetadata metadata) {
String newLocation = writeNewMetadataIfRequired(base == null, metadata);
String oldLocation = base == null ? null : base.metadataFileLocation();
synchronized (InMemoryCatalog.this) {
if (null == base && !namespaceExists(tableIdentifier.namespace())) {
throw new NoSuchNamespaceException(
"Cannot create table %s. Namespace does not exist: %s",
tableIdentifier, tableIdentifier.namespace());
}
if (views.containsKey(tableIdentifier)) {
throw new AlreadyExistsException(
"View with same name already exists: %s", tableIdentifier);
}
tables.compute(
tableIdentifier,
(k, existingLocation) -> {
if (!Objects.equal(existingLocation, oldLocation)) {
if (null == base) {
throw new AlreadyExistsException("Table already exists: %s", tableName());
}
if (null == existingLocation) {
throw new NoSuchTableException("Table does not exist: %s", tableName());
}
throw new CommitFailedException(
"Cannot commit to table %s metadata location from %s to %s "
+ "because it has been concurrently modified to %s",
tableIdentifier, oldLocation, newLocation, existingLocation);
}
return newLocation;
});
}
}
@Override
public FileIO io() {
return fileIO;
}
@Override
protected String tableName() {
return fullTableName;
}
}
private class InMemoryViewOperations extends BaseViewOperations {
private final FileIO io;
private final TableIdentifier identifier;
private final String fullViewName;
InMemoryViewOperations(FileIO io, TableIdentifier identifier) {
this.io = io;
this.identifier = identifier;
this.fullViewName = ViewUtil.fullViewName(catalogName, identifier);
}
@Override
public void doRefresh() {
String latestLocation = views.get(identifier);
if (latestLocation == null) {
disableRefresh();
} else {
refreshFromMetadataLocation(latestLocation);
}
}
@Override
public void doCommit(ViewMetadata base, ViewMetadata metadata) {
String newLocation = writeNewMetadataIfRequired(metadata);
String oldLocation = base == null ? null : currentMetadataLocation();
synchronized (InMemoryCatalog.this) {
if (null == base && !namespaceExists(identifier.namespace())) {
throw new NoSuchNamespaceException(
"Cannot create view %s. Namespace does not exist: %s",
identifier, identifier.namespace());
}
if (tables.containsKey(identifier)) {
throw new AlreadyExistsException("Table with same name already exists: %s", identifier);
}
views.compute(
identifier,
(k, existingLocation) -> {
if (!Objects.equal(existingLocation, oldLocation)) {
if (null == base) {
throw new AlreadyExistsException("View already exists: %s", identifier);
}
if (null == existingLocation) {
throw new NoSuchViewException("View does not exist: %s", identifier);
}
throw new CommitFailedException(
"Cannot commit to view %s metadata location from %s to %s "
+ "because it has been concurrently modified to %s",
identifier, oldLocation, newLocation, existingLocation);
}
return newLocation;
});
}
}
@Override
public FileIO io() {
return io;
}
@Override
protected String viewName() {
return fullViewName;
}
}
}