blob: 8c39e698f26036d1abe49334a491bd7723933884 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;
import org.apache.iceberg.Table;
import org.apache.iceberg.common.DynConstructors;
import org.apache.iceberg.common.DynMethods;
import org.apache.spark.sql.SparkSession;
public class Actions {
/*
We load the actual implementation of Actions via reflection to allow for differences
between the major Spark APIs while still defining the API in this class.
*/
private static final String IMPL_NAME = "SparkActions";
private static DynConstructors.Ctor<Actions> implConstructor;
private static String implClass() {
return Actions.class.getPackage().getName() + "." + IMPL_NAME;
}
private static DynConstructors.Ctor<Actions> actionConstructor() {
if (implConstructor == null) {
String className = implClass();
try {
implConstructor =
DynConstructors.builder().hiddenImpl(className, SparkSession.class, Table.class).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Cannot find appropriate Actions implementation on the classpath.", e);
}
}
return implConstructor;
}
private SparkSession spark;
private Table table;
protected Actions(SparkSession spark, Table table) {
this.spark = spark;
this.table = table;
}
public static Actions forTable(SparkSession spark, Table table) {
return actionConstructor().newInstance(spark, table);
}
public static Actions forTable(Table table) {
return forTable(SparkSession.active(), table);
}
public RemoveOrphanFilesAction removeOrphanFiles() {
return new RemoveOrphanFilesAction(spark, table);
}
public RewriteManifestsAction rewriteManifests() {
return new RewriteManifestsAction(spark, table);
}
public RewriteDataFilesAction rewriteDataFiles() {
return new RewriteDataFilesAction(spark, table);
}
public ExpireSnapshotsAction expireSnapshots() {
return new ExpireSnapshotsAction(spark, table);
}
/**
* Converts the provided table into an Iceberg table in place. The table will no longer be accessible by it's
* previous implementation
*
* @param tableName Table to be converted
* @return {@link CreateAction} to perform migration
*/
public static CreateAction migrate(String tableName) {
try {
return DynMethods.builder("migrate")
.impl(implClass(), String.class).buildStaticChecked()
.invoke(tableName);
} catch (NoSuchMethodException ex) {
throw new UnsupportedOperationException("Migrate is not implemented for this version of Spark");
}
}
/**
* Converts the provided table into an Iceberg table in place. The table will no longer be accessible by it's
* previous implementation
*
* @param tableName Table to be converted
* @param spark Spark session to use for looking up table
* @return {@link CreateAction} to perform migration
*/
public static CreateAction migrate(SparkSession spark, String tableName) {
try {
return DynMethods.builder("migrate")
.impl(implClass(), SparkSession.class, String.class).buildStaticChecked()
.invoke(spark, tableName);
} catch (NoSuchMethodException ex) {
throw new UnsupportedOperationException("Migrate is not implemented for this version of Spark");
}
}
/**
* Creates an independent Iceberg table based on a given table. The new Iceberg table can be altered, appended or
* deleted without causing any change to the original. New data and metadata will be created in the default
* location for tables of this name in the destination catalog.
*
* @param sourceTable Original table which is the basis for the new Iceberg table
* @param destTable New Iceberg table being created
* @return {@link SnapshotAction} to perform snapshot
*/
public static SnapshotAction snapshot(SparkSession spark, String sourceTable, String destTable) {
try {
return DynMethods.builder("snapshot")
.impl(implClass(), SparkSession.class, String.class, String.class).buildStaticChecked()
.invoke(spark, sourceTable, destTable);
} catch (NoSuchMethodException ex) {
throw new UnsupportedOperationException("Snapshot is not implemented for this version of Spark");
}
}
/**
* Creates an independent Iceberg table based on a given table. The new Iceberg table can be altered, appended or
* deleted without causing any change to the original. New data and metadata will be created in the default
* location for tables of this name in the destination catalog.
*
* @param sourceTable Original table which is the basis for the new Iceberg table
* @param destTable New Iceberg table being created
* @return {@link SnapshotAction} to perform snapshot
*/
public static SnapshotAction snapshot(String sourceTable, String destTable) {
try {
return DynMethods.builder("snapshot")
.impl(implClass(), String.class, String.class).buildStaticChecked()
.invoke(sourceTable, destTable);
} catch (NoSuchMethodException ex) {
throw new UnsupportedOperationException("Snapshot is not implemented for this version of Spark");
}
}
protected SparkSession spark() {
return spark;
}
protected Table table() {
return table;
}
}