blob: d6cc786f5bd32a9ad5fa6fdf8aa6137e03c211c0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.gravitino.listener;
import java.util.Map;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.CatalogChange;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.catalog.CatalogDispatcher;
import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchMetalakeException;
import org.apache.gravitino.listener.api.event.AlterCatalogEvent;
import org.apache.gravitino.listener.api.event.AlterCatalogFailureEvent;
import org.apache.gravitino.listener.api.event.CreateCatalogEvent;
import org.apache.gravitino.listener.api.event.CreateCatalogFailureEvent;
import org.apache.gravitino.listener.api.event.DropCatalogEvent;
import org.apache.gravitino.listener.api.event.DropCatalogFailureEvent;
import org.apache.gravitino.listener.api.event.ListCatalogEvent;
import org.apache.gravitino.listener.api.event.ListCatalogFailureEvent;
import org.apache.gravitino.listener.api.event.LoadCatalogEvent;
import org.apache.gravitino.listener.api.event.LoadCatalogFailureEvent;
import org.apache.gravitino.listener.api.info.CatalogInfo;
import org.apache.gravitino.utils.PrincipalUtils;
/**
* {@code CatalogEventDispatcher} is a decorator for {@link CatalogDispatcher} that not only
* delegates catalog operations to the underlying catalog dispatcher but also dispatches
* corresponding events to an {@link org.apache.gravitino.listener.EventBus} after each operation is
* completed. This allows for event-driven workflows or monitoring of catalog operations.
*/
public class CatalogEventDispatcher implements CatalogDispatcher {
private final EventBus eventBus;
private final CatalogDispatcher dispatcher;
/**
* Constructs a CatalogEventDispatcher with a specified EventBus and CatalogDispatcher.
*
* @param eventBus The EventBus to which events will be dispatched.
* @param dispatcher The underlying {@link CatalogDispatcher} that will perform the actual catalog
* operations.
*/
public CatalogEventDispatcher(EventBus eventBus, CatalogDispatcher dispatcher) {
this.eventBus = eventBus;
this.dispatcher = dispatcher;
}
@Override
public NameIdentifier[] listCatalogs(Namespace namespace) throws NoSuchMetalakeException {
try {
NameIdentifier[] nameIdentifiers = dispatcher.listCatalogs(namespace);
eventBus.dispatchEvent(new ListCatalogEvent(PrincipalUtils.getCurrentUserName(), namespace));
return nameIdentifiers;
} catch (Exception e) {
eventBus.dispatchEvent(
new ListCatalogFailureEvent(PrincipalUtils.getCurrentUserName(), e, namespace));
throw e;
}
}
@Override
public Catalog[] listCatalogsInfo(Namespace namespace) throws NoSuchMetalakeException {
try {
Catalog[] catalogs = dispatcher.listCatalogsInfo(namespace);
eventBus.dispatchEvent(new ListCatalogEvent(PrincipalUtils.getCurrentUserName(), namespace));
return catalogs;
} catch (Exception e) {
eventBus.dispatchEvent(
new ListCatalogFailureEvent(PrincipalUtils.getCurrentUserName(), e, namespace));
throw e;
}
}
@Override
public Catalog loadCatalog(NameIdentifier ident) throws NoSuchCatalogException {
try {
Catalog catalog = dispatcher.loadCatalog(ident);
eventBus.dispatchEvent(
new LoadCatalogEvent(
PrincipalUtils.getCurrentUserName(), ident, new CatalogInfo(catalog)));
return catalog;
} catch (Exception e) {
eventBus.dispatchEvent(
new LoadCatalogFailureEvent(PrincipalUtils.getCurrentUserName(), ident, e));
throw e;
}
}
@Override
public Catalog createCatalog(
NameIdentifier ident,
Catalog.Type type,
String provider,
String comment,
Map<String, String> properties)
throws NoSuchMetalakeException, CatalogAlreadyExistsException {
try {
Catalog catalog = dispatcher.createCatalog(ident, type, provider, comment, properties);
eventBus.dispatchEvent(
new CreateCatalogEvent(
PrincipalUtils.getCurrentUserName(), ident, new CatalogInfo(catalog)));
return catalog;
} catch (Exception e) {
CatalogInfo createCatalogRequest =
new CatalogInfo(ident.name(), type, provider, comment, properties, null);
eventBus.dispatchEvent(
new CreateCatalogFailureEvent(
PrincipalUtils.getCurrentUserName(), ident, e, createCatalogRequest));
throw e;
}
}
@Override
public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
throws NoSuchCatalogException, IllegalArgumentException {
try {
Catalog catalog = dispatcher.alterCatalog(ident, changes);
eventBus.dispatchEvent(
new AlterCatalogEvent(
PrincipalUtils.getCurrentUserName(), ident, changes, new CatalogInfo(catalog)));
return catalog;
} catch (Exception e) {
eventBus.dispatchEvent(
new AlterCatalogFailureEvent(PrincipalUtils.getCurrentUserName(), ident, e, changes));
throw e;
}
}
@Override
public boolean dropCatalog(NameIdentifier ident) {
try {
boolean isExists = dispatcher.dropCatalog(ident);
eventBus.dispatchEvent(
new DropCatalogEvent(PrincipalUtils.getCurrentUserName(), ident, isExists));
return isExists;
} catch (Exception e) {
eventBus.dispatchEvent(
new DropCatalogFailureEvent(PrincipalUtils.getCurrentUserName(), ident, e));
throw e;
}
}
@Override
public void testConnection(
NameIdentifier ident,
Catalog.Type type,
String provider,
String comment,
Map<String, String> properties)
throws Exception {
// TODO: Support event dispatching for testConnection
dispatcher.testConnection(ident, type, provider, comment, properties);
}
}