blob: 364cc9121570a468d2a8c43cfdd6dd4c00492a59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.planner.sql.handlers;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.io.IOUtils;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.SchemaUtilities;
import org.apache.drill.exec.planner.sql.parser.SqlCreateType;
import org.apache.drill.exec.planner.sql.parser.SqlSchema;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.record.metadata.schema.InlineSchemaProvider;
import org.apache.drill.exec.record.metadata.schema.PathSchemaProvider;
import org.apache.drill.exec.record.metadata.schema.SchemaContainer;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.record.metadata.schema.SchemaProviderFactory;
import org.apache.drill.exec.record.metadata.schema.StorageProperties;
import org.apache.drill.exec.store.AbstractSchema;
import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.dfs.WorkspaceSchemaFactory;
import org.apache.drill.exec.util.ImpersonationUtil;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* Parent class for CREATE / DROP / DESCRIBE / ALTER SCHEMA handlers.
* Contains common logic on how extract workspace, output error result.
*/
public abstract class SchemaHandler extends DefaultSqlHandler {
static final Logger logger = LoggerFactory.getLogger(SchemaHandler.class);
SchemaHandler(SqlHandlerConfig config) {
super(config);
}
public WorkspaceSchemaFactory.WorkspaceSchema getWorkspaceSchema(List<String> tableSchema, String tableName) {
SchemaPlus defaultSchema = config.getConverter().getDefaultSchema();
AbstractSchema temporarySchema = SchemaUtilities.resolveToTemporarySchema(tableSchema, defaultSchema, context.getConfig());
if (context.getSession().isTemporaryTable(temporarySchema, context.getConfig(), tableName)) {
produceErrorResult(String.format("Indicated table [%s] is temporary table", tableName), true);
}
AbstractSchema drillSchema = SchemaUtilities.resolveToMutableDrillSchema(defaultSchema, tableSchema);
Table table = SqlHandlerUtil.getTableFromSchema(drillSchema, tableName);
if (table == null || table.getJdbcTableType() != Schema.TableType.TABLE) {
produceErrorResult(String.format("Table [%s] was not found", tableName), true);
}
if (!(drillSchema instanceof WorkspaceSchemaFactory.WorkspaceSchema)) {
produceErrorResult(String.format("Table [`%s`.`%s`] must belong to file storage plugin",
drillSchema.getFullSchemaName(), tableName), true);
}
Preconditions.checkState(drillSchema instanceof WorkspaceSchemaFactory.WorkspaceSchema);
return (WorkspaceSchemaFactory.WorkspaceSchema) drillSchema;
}
PhysicalPlan produceErrorResult(String message, boolean doFail) {
if (doFail) {
throw UserException.validationError().message(message).build(logger);
} else {
return DirectPlan.createDirectPlan(context, false, message);
}
}
/**
* Provides storage strategy which will create schema file
* with same permission as used for persistent tables.
*
* @return storage strategy
*/
StorageStrategy getStorageStrategy() {
return new StorageStrategy(context.getOption(
ExecConstants.PERSISTENT_TABLE_UMASK).string_val, false);
}
/**
* CREATE SCHEMA command handler.
*/
public static class Create extends SchemaHandler {
public Create(SqlHandlerConfig config) {
super(config);
}
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) {
SqlSchema.Create sqlCall = ((SqlSchema.Create) sqlNode);
String schemaString = getSchemaString(sqlCall);
String schemaSource = sqlCall.hasTable() ? sqlCall.getTable().toString() : sqlCall.getPath();
try {
SchemaProvider schemaProvider = SchemaProviderFactory.create(sqlCall, this);
if (schemaProvider.exists()) {
if (SqlCreateType.OR_REPLACE == sqlCall.getSqlCreateType()) {
schemaProvider.delete();
} else {
return produceErrorResult(String.format("Schema already exists for [%s]", schemaSource), true);
}
}
StorageProperties storageProperties = StorageProperties.builder()
.storageStrategy(getStorageStrategy())
.overwrite(false)
.build();
schemaProvider.store(schemaString, sqlCall.getProperties(), storageProperties);
return DirectPlan.createDirectPlan(context, true, String.format("Created schema for [%s]", schemaSource));
} catch (IOException e) {
throw UserException.resourceError(e)
.message(e.getMessage())
.addContext("Error while preparing / creating schema for [%s]", schemaSource)
.build(logger);
} catch (IllegalArgumentException e) {
throw UserException.validationError(e)
.message(e.getMessage())
.addContext("Error while preparing / creating schema for [%s]", schemaSource)
.build(logger);
}
}
/**
* If raw schema was present in create schema command, returns schema from command,
* otherwise loads raw schema from the given file.
*
* @param sqlCall sql create schema call
* @return string representation of raw schema (column names, types and nullability)
*/
private String getSchemaString(SqlSchema.Create sqlCall) {
if (sqlCall.hasSchema()) {
return sqlCall.getSchema();
}
Path path = new Path(sqlCall.getLoad());
try {
FileSystem rawFs = path.getFileSystem(new Configuration());
FileSystem fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), rawFs.getConf());
if (!fs.exists(path)) {
throw UserException.resourceError()
.message("File with raw schema [%s] does not exist", path.toUri().getPath())
.build(logger);
}
try (InputStream stream = fs.open(path)) {
return IOUtils.toString(stream);
}
} catch (IOException e) {
throw UserException.resourceError(e)
.message("Unable to load raw schema from file %s", path.toUri().getPath())
.build(logger);
}
}
}
/**
* DROP SCHEMA command handler.
*/
public static class Drop extends SchemaHandler {
public Drop(SqlHandlerConfig config) {
super(config);
}
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) {
SqlSchema.Drop sqlCall = ((SqlSchema.Drop) sqlNode);
try {
SchemaProvider schemaProvider = SchemaProviderFactory.create(sqlCall, this);
if (!schemaProvider.exists()) {
return produceErrorResult(String.format("Schema [%s] does not exist in table [%s] root directory",
SchemaProvider.DEFAULT_SCHEMA_NAME, sqlCall.getTable()), !sqlCall.ifExists());
}
schemaProvider.delete();
return DirectPlan.createDirectPlan(context, true,
String.format("Dropped schema for table [%s]", sqlCall.getTable()));
} catch (IOException e) {
throw UserException.resourceError(e)
.message(e.getMessage())
.addContext("Error while accessing table location or deleting schema for [%s]", sqlCall.getTable())
.build(logger);
}
}
}
/**
* DESCRIBE SCHEMA FOR TABLE command handler.
*/
public static class Describe extends SchemaHandler {
public Describe(SqlHandlerConfig config) {
super(config);
}
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) {
SqlSchema.Describe sqlCall = ((SqlSchema.Describe) sqlNode);
try {
SchemaProvider schemaProvider = SchemaProviderFactory.create(sqlCall, this);
if (schemaProvider.exists()) {
SchemaContainer schemaContainer = schemaProvider.read();
String schema;
switch (sqlCall.getFormat()) {
case JSON:
schema = PathSchemaProvider.WRITER.writeValueAsString(schemaContainer);
break;
case STATEMENT:
TupleMetadata metadata = schemaContainer.getSchema();
StringBuilder builder = new StringBuilder("CREATE OR REPLACE SCHEMA \n");
List<ColumnMetadata> columnsMetadata = metadata.toMetadataList();
if (columnsMetadata.isEmpty()) {
builder.append("() \n");
} else {
builder.append("(\n");
builder.append(columnsMetadata.stream()
.map(ColumnMetadata::columnString)
.collect(Collectors.joining(", \n")));
builder.append("\n) \n");
}
builder.append("FOR TABLE ").append(schemaContainer.getTable()).append(" \n");
Map<String, String> properties = metadata.properties();
if (!properties.isEmpty()) {
builder.append("PROPERTIES (\n");
builder.append(properties.entrySet().stream()
.map(e -> String.format("'%s' = '%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(", \n")));
builder.append("\n)");
}
schema = builder.toString();
break;
default:
throw UserException.validationError()
.message("Unsupported describe schema format: [%s]", sqlCall.getFormat())
.build(logger);
}
return DirectPlan.createDirectPlan(context, new SchemaResult(schema));
}
return DirectPlan.createDirectPlan(context, false,
String.format("Schema for table [%s] is absent", sqlCall.getTable()));
} catch (IOException e) {
throw UserException.resourceError(e)
.message(e.getMessage())
.addContext("Error while accessing schema for table [%s]", sqlCall.getTable())
.build(logger);
}
}
/**
* Wrapper to output schema in a form of table with one column named `schema`.
*/
public static class SchemaResult {
public String schema;
public SchemaResult(String schema) {
this.schema = schema;
}
}
}
/**
* ALTER SCHEMA ADD command handler.
*/
public static class Add extends SchemaHandler {
public Add(SqlHandlerConfig config) {
super(config);
}
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) {
SqlSchema.Add addCall = ((SqlSchema.Add) sqlNode);
String schemaSource = addCall.hasTable() ? addCall.getTable().toString() : addCall.getPath();
try {
SchemaProvider schemaProvider = SchemaProviderFactory.create(addCall, this);
if (!schemaProvider.exists()) {
throw UserException.resourceError()
.message("Schema does not exist for [%s]", schemaSource)
.addContext("Command: ALTER SCHEMA ADD")
.build(logger);
}
TupleMetadata currentSchema = schemaProvider.read().getSchema();
TupleMetadata newSchema = new TupleSchema();
if (addCall.hasSchema()) {
InlineSchemaProvider inlineSchemaProvider = new InlineSchemaProvider(addCall.getSchema());
TupleMetadata providedSchema = inlineSchemaProvider.read().getSchema();
if (addCall.isReplace()) {
Map<String, ColumnMetadata> columnsMap = Stream.concat(currentSchema.toMetadataList().stream(), providedSchema.toMetadataList().stream())
.collect(Collectors.toMap(
ColumnMetadata::name,
Function.identity(),
(o, n) -> n, // replace existing columns
LinkedHashMap::new)); // preserve initial order of the columns
columnsMap.values().forEach(newSchema::addColumn);
} else {
Stream.concat(currentSchema.toMetadataList().stream(), providedSchema.toMetadataList().stream())
.forEach(newSchema::addColumn);
}
} else {
currentSchema.toMetadataList().forEach(newSchema::addColumn);
}
if (addCall.hasProperties()) {
if (addCall.isReplace()) {
newSchema.setProperties(currentSchema.properties());
newSchema.setProperties(addCall.getProperties());
} else {
Map<String, String> newProperties = Stream.concat(currentSchema.properties().entrySet().stream(), addCall.getProperties().entrySet().stream())
.collect(Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue)); // no merge strategy is provided to fail on duplicate
newSchema.setProperties(newProperties);
}
} else {
newSchema.setProperties(currentSchema.properties());
}
String schemaString = newSchema.toMetadataList().stream()
.map(ColumnMetadata::columnString)
.collect(Collectors.joining(", "));
StorageProperties storageProperties = StorageProperties.builder()
.storageStrategy(getStorageStrategy())
.overwrite()
.build();
schemaProvider.store(schemaString, newSchema.properties(), storageProperties);
return DirectPlan.createDirectPlan(context, true, String.format("Schema for [%s] was updated", schemaSource));
} catch (IOException e) {
throw UserException.resourceError(e)
.message("Error while accessing / modifying schema for [%s]: %s", schemaSource, e.getMessage())
.build(logger);
} catch (IllegalArgumentException | IllegalStateException e) {
throw UserException.validationError(e)
.message(e.getMessage())
.addContext("Error while preparing / creating schema for [%s]", schemaSource)
.build(logger);
}
}
}
/**
* ALTER SCHEMA REMOVE command handler.
*/
public static class Remove extends SchemaHandler {
public Remove(SqlHandlerConfig config) {
super(config);
}
@Override
public PhysicalPlan getPlan(SqlNode sqlNode) {
SqlSchema.Remove removeCall = ((SqlSchema.Remove) sqlNode);
String schemaSource = removeCall.hasTable() ? removeCall.getTable().toString() : removeCall.getPath();
try {
SchemaProvider schemaProvider = SchemaProviderFactory.create(removeCall, this);
if (!schemaProvider.exists()) {
throw UserException.resourceError()
.message("Schema does not exist for [%s]", schemaSource)
.addContext("Command: ALTER SCHEMA REMOVE")
.build(logger);
}
TupleMetadata currentSchema = schemaProvider.read().getSchema();
TupleMetadata newSchema = new TupleSchema();
List<String> columns = removeCall.getColumns();
currentSchema.toMetadataList().stream()
.filter(column -> columns == null || !columns.contains(column.name()))
.forEach(newSchema::addColumn);
newSchema.setProperties(currentSchema.properties());
if (removeCall.hasProperties()) {
removeCall.getProperties().forEach(newSchema::removeProperty);
}
StorageProperties storageProperties = StorageProperties.builder()
.storageStrategy(getStorageStrategy())
.overwrite()
.build();
String schemaString = newSchema.toMetadataList().stream()
.map(ColumnMetadata::columnString)
.collect(Collectors.joining(", "));
schemaProvider.store(schemaString, newSchema.properties(), storageProperties);
return DirectPlan.createDirectPlan(context, true, String.format("Schema for [%s] was updated", schemaSource));
} catch (IOException e) {
throw UserException.resourceError(e)
.message("Error while accessing / modifying schema for [%s]: %s", schemaSource, e.getMessage())
.build(logger);
} catch (IllegalArgumentException e) {
throw UserException.validationError(e)
.message(e.getMessage())
.addContext("Error while preparing / creating schema for [%s]", schemaSource)
.build(logger);
}
}
}
}