blob: 098ebdbc5779f0917f9ec5f8e73b05fb6ea7fc41 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import static java.util.Collections.unmodifiableList;
import static org.apache.drill.exec.dotdrill.DotDrillType.STATS;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import org.apache.calcite.schema.Function;
import org.apache.calcite.schema.Table;
import org.apache.commons.lang3.SystemUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.scanner.persistence.ScanResult;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.dotdrill.DotDrillFile;
import org.apache.drill.exec.dotdrill.DotDrillType;
import org.apache.drill.exec.dotdrill.DotDrillUtil;
import org.apache.drill.exec.dotdrill.View;
import org.apache.drill.exec.metastore.FileSystemMetadataProviderManager;
import org.apache.drill.exec.metastore.MetadataProviderManager;
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.planner.logical.CreateTableEntry;
import org.apache.drill.exec.planner.logical.DrillTable;
import org.apache.drill.exec.planner.logical.DrillViewTable;
import org.apache.drill.exec.planner.logical.DynamicDrillTable;
import org.apache.drill.exec.planner.logical.FileSystemCreateTableEntry;
import org.apache.drill.exec.planner.sql.ExpandingConcurrentMap;
import org.apache.drill.exec.record.metadata.schema.FsMetastoreSchemaProvider;
import org.apache.drill.exec.util.DrillFileSystemUtil;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import com.fasterxml.jackson.databind.ObjectMapper;
public class WorkspaceSchemaFactory {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkspaceSchemaFactory.class);
private final List<FormatMatcher> fileMatchers;
private final List<FormatMatcher> dropFileMatchers;
private final List<FormatMatcher> dirMatchers;
private final WorkspaceConfig config;
private final Configuration fsConf;
private final String storageEngineName;
private final String schemaName;
private final FileSystemPlugin plugin;
private final ObjectMapper mapper;
private final LogicalPlanPersistence logicalPlanPersistence;
private final Path wsPath;
private final FormatPluginOptionExtractor optionExtractor;
public WorkspaceSchemaFactory(
FileSystemPlugin plugin,
String schemaName,
String storageEngineName,
WorkspaceConfig config,
List<FormatMatcher> formatMatchers,
LogicalPlanPersistence logicalPlanPersistence,
ScanResult scanResult) throws ExecutionSetupException {
this.logicalPlanPersistence = logicalPlanPersistence;
this.fsConf = plugin.getFsConf();
this.plugin = plugin;
this.config = config;
this.mapper = logicalPlanPersistence.getMapper();
this.fileMatchers = Lists.newArrayList();
this.dirMatchers = Lists.newArrayList();
this.storageEngineName = storageEngineName;
this.schemaName = schemaName;
this.wsPath = new Path(config.getLocation());
this.optionExtractor = new FormatPluginOptionExtractor(scanResult);
for (FormatMatcher m : formatMatchers) {
if (m.supportDirectoryReads()) {
// NOTE: Add fallback format matcher if given in the configuration. Make sure fileMatchers is an order-preserving list.
final String defaultInputFormat = config.getDefaultInputFormat();
if (!Strings.isNullOrEmpty(defaultInputFormat)) {
final FormatPlugin formatPlugin = plugin.getFormatPlugin(defaultInputFormat);
if (formatPlugin == null) {
final String message = String.format("Unable to find default input format[%s] for workspace[%s.%s]",
defaultInputFormat, storageEngineName, schemaName);
throw new ExecutionSetupException(message);
final FormatMatcher fallbackMatcher = new BasicFormatMatcher(formatPlugin,
ImmutableList.of(Pattern.compile(".*")), ImmutableList.of());
dropFileMatchers = fileMatchers.subList(0, fileMatchers.size() - 1);
} else {
dropFileMatchers = fileMatchers.subList(0, fileMatchers.size());
* Checks whether the given user has permission to list files/directories under the workspace directory.
* @param userName User who is trying to access the workspace.
* @return True if the user has access. False otherwise.
public boolean accessible(final String userName) throws IOException {
final DrillFileSystem fs = ImpersonationUtil.createFileSystem(userName, fsConf);
return accessible(fs);
* Checks whether a FileSystem object has the permission to list/read workspace directory
* @param fs a DrillFileSystem object that was created with certain user privilege
* @return True if the user has access. False otherwise.
public boolean accessible(DrillFileSystem fs) throws IOException {
try {
* For Windows local file system, fs.access ends up using DeprecatedRawLocalFileStatus which has
* TrustedInstaller as owner, and a member of Administrators group could not satisfy the permission.
* In this case, we will still use method listStatus.
* In other cases, we use access method since it is cheaper.
if (SystemUtils.IS_OS_WINDOWS && fs.getUri().getScheme().equalsIgnoreCase(FileSystemSchemaFactory.LOCAL_FS_SCHEME)) {
else {
fs.access(wsPath, FsAction.READ);
} catch (final UnsupportedOperationException e) {
logger.trace("The filesystem for this workspace does not support this operation.", e);
} catch (final FileNotFoundException | AccessControlException e) {
return false;
return true;
private Path getViewPath(String name) {
return DotDrillType.VIEW.getPath(config.getLocation(), name);
public WorkspaceSchema createSchema(List<String> parentSchemaPath, SchemaConfig schemaConfig, DrillFileSystem fs) throws IOException {
if (!accessible(fs)) {
return null;
return new WorkspaceSchema(parentSchemaPath, schemaName, schemaConfig, fs);
public String getSchemaName() {
return schemaName;
public FileSystemPlugin getPlugin() {
return plugin;
// Ensure given tableName is not a stats table
private static void ensureNotStatsTable(final String tableName) {
if (tableName.toLowerCase().endsWith(STATS.getEnding())) {
throw UserException
.message("Given table [%s] is already a stats table. " +
"Cannot perform stats operations on a stats table.", tableName)
private static Object[] array(Object... objects) {
return objects;
public static final class TableInstance {
final TableSignature sig;
final List<Object> params;
public TableInstance(TableSignature sig, List<Object> params) {
if (params.size() != sig.getParams().size()) {
throw UserException.parseError()
"should have as many params (%d) as signature (%d)",
params.size(), sig.getParams().size())
.addContext("table", sig.getName())
this.sig = sig;
this.params = unmodifiableList(params);
String presentParams() {
StringBuilder sb = new StringBuilder("(");
boolean first = true;
for (int i = 0; i < params.size(); i++) {
Object param = params.get(i);
if (param != null) {
if (first) {
first = false;
} else {
sb.append(", ");
TableParamDef paramDef = sig.getParams().get(i);
sb.append(paramDef.getName()).append(": ").append(paramDef.getType().getSimpleName()).append(" => ").append(param);
return sb.toString();
private Object[] toArray() {
return array(sig, params);
public int hashCode() {
return Arrays.hashCode(toArray());
public boolean equals(Object obj) {
if (obj instanceof TableInstance) {
return Arrays.equals(this.toArray(), ((TableInstance)obj).toArray());
return false;
public String toString() {
return sig.getName() + (params.size() == 0 ? "" : presentParams());
public class WorkspaceSchema extends AbstractSchema implements ExpandingConcurrentMap.MapValueFactory<TableInstance, DrillTable> {
private final ExpandingConcurrentMap<TableInstance, DrillTable> tables = new ExpandingConcurrentMap<>(this);
private final SchemaConfig schemaConfig;
private DrillFileSystem fs;
// Drill Process User file-system
private DrillFileSystem dpsFs;
public WorkspaceSchema(List<String> parentSchemaPath, String wsName, SchemaConfig schemaConfig, DrillFileSystem fs) {
super(parentSchemaPath, wsName);
this.schemaConfig = schemaConfig;
this.fs = fs;
this.dpsFs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fsConf);
DrillTable getDrillTable(TableInstance key) {
return tables.get(key);
public boolean createView(View view) throws IOException {
Path viewPath = getViewPath(view.getName());
boolean replaced = getFS().exists(viewPath);
final FsPermission viewPerms =
new FsPermission(schemaConfig.getOption(ExecConstants.NEW_VIEW_DEFAULT_PERMS_KEY).string_val);
try (OutputStream stream = DrillFileSystem.create(getFS(), viewPath, viewPerms)) {
mapper.writeValue(stream, view);
return replaced;
public Iterable<String> getSubPartitions(String table,
List<String> partitionColumns,
List<String> partitionValues
) throws PartitionNotFoundException {
List<FileStatus> fileStatuses;
try {
fileStatuses = DrillFileSystemUtil.listDirectories(getFS(), new Path(getDefaultLocation(), table), false);
} catch (IOException e) {
throw new PartitionNotFoundException("Error finding partitions for table " + table, e);
return new SubDirectoryList(fileStatuses);
public void dropView(String viewName) throws IOException {
getFS().delete(getViewPath(viewName), false);
private Set<String> getViews() {
Set<String> viewSet = Sets.newHashSet();
// Look for files with ".view.drill" extension.
List<DotDrillFile> files;
try {
files = DotDrillUtil.getDotDrills(getFS(), new Path(config.getLocation()), DotDrillType.VIEW);
for (DotDrillFile f : files) {
} catch (UnsupportedOperationException e) {
logger.debug("The filesystem for this workspace does not support this operation.", e);
} catch (AccessControlException e) {
if (!schemaConfig.getIgnoreAuthErrors()) {
throw UserException
.message("Not authorized to list view tables in schema [%s]", getFullSchemaName())
} catch (Exception e) {
logger.warn("Failure while trying to list .view.drill files in workspace [{}]", getFullSchemaName(), e);
return viewSet;
private Set<String> rawTableNames() {
return tables.keySet().stream()
.map(input -> input.sig.getName())
public Set<String> getTableNames() {
return Sets.union(rawTableNames(), getViews());
public Set<String> getFunctionNames() {
return rawTableNames();
public List<Function> getFunctions(String name) {
// add parent functions first
List<Function> functions = new ArrayList<>(super.getFunctions(name));
List<TableParamDef> tableParameters = getFunctionParameters();
List<TableSignature> signatures = optionExtractor.getTableSignatures(name, tableParameters);
.map(signature -> new WithOptionsTableMacro(signature, params -> getDrillTable(new TableInstance(signature, params))))
return functions;
private View getView(DotDrillFile f) throws IOException {
assert f.getType() == DotDrillType.VIEW;
return f.getView(logicalPlanPersistence);
public Table getTable(String tableName) {
TableInstance tableKey = new TableInstance(TableSignature.of(tableName), ImmutableList.of());
// first check existing tables.
if (tables.alreadyContainsKey(tableKey)) {
return tables.get(tableKey);
// then look for files that start with this name and end in .drill.
List<DotDrillFile> files = Collections.emptyList();
try {
try {
files = DotDrillUtil.getDotDrills(getFS(), new Path(config.getLocation()),
FileSelection.removeLeadingSlash(tableName), DotDrillType.VIEW);
} catch (AccessControlException e) {
if (!schemaConfig.getIgnoreAuthErrors()) {
throw UserException.permissionError(e)
.message("Not authorized to list or query tables in schema [%s]", getFullSchemaName())
} catch (IOException e) {
logger.warn("Failure while trying to list view tables in workspace [{}]", getFullSchemaName(), e);
for (DotDrillFile f : files) {
switch (f.getType()) {
case VIEW:
try {
return new DrillViewTable(getView(f), f.getOwner(), schemaConfig.getViewExpansionContext());
} catch (AccessControlException e) {
if (!schemaConfig.getIgnoreAuthErrors()) {
throw UserException.permissionError(e)
.message("Not authorized to read view [%s] in schema [%s]", tableName, getFullSchemaName())
} catch (IOException e) {
logger.warn("Failure while trying to load {}.view.drill file in workspace [{}]", tableName, getFullSchemaName(), e);
} catch (UnsupportedOperationException e) {
logger.debug("The filesystem for this workspace does not support this operation.", e);
final DrillTable table = tables.get(tableKey);
if (table != null) {
MetadataProviderManager providerManager = FileSystemMetadataProviderManager.init();
setMetadataTable(providerManager, table, tableName);
setSchema(providerManager, tableName);
return table;
private void setSchema(MetadataProviderManager providerManager, String tableName) {
if (schemaConfig.getOption(ExecConstants.STORE_TABLE_USE_SCHEMA_FILE).bool_val) {
try {
FsMetastoreSchemaProvider schemaProvider = new FsMetastoreSchemaProvider(this, tableName);
} catch (IOException e) {
logger.debug("Unable to init schema provider for table [{}]", tableName, e);
private void setMetadataTable(MetadataProviderManager metadataProviderManager, DrillTable table, final String tableName) {
// If this itself is the stats table, then skip it.
if (tableName.toLowerCase().endsWith(STATS.getEnding())) {
try {
String statsTableName = getStatsTableName(tableName);
Path statsTableFilePath = getStatsTableFilePath(tableName);
metadataProviderManager.setStatsProvider(new DrillStatsTable(table, getFullSchemaName(), statsTableName,
statsTableFilePath, fs));
} catch (Exception e) {
logger.warn("Failed to find the stats table for table [{}] in schema [{}]",
tableName, getFullSchemaName());
// Get stats table name for a given table name.
private String getStatsTableName(final String tableName) {
// Access stats file as DRILL process user (not impersonated user)
final Path tablePath = new Path(config.getLocation(), tableName);
try {
String name;
if (dpsFs.isDirectory(tablePath)) {
name = tableName + Path.SEPARATOR + STATS.getEnding();
if (dpsFs.isDirectory(new Path(name))) {
return name;
} else {
//TODO: Not really useful. Remove?
name = tableName + STATS.getEnding();
if (dpsFs.isFile(new Path(name))) {
return name;
return name;
} catch (final Exception e) {
throw new DrillRuntimeException(
String.format("Failed to find the stats for table [%s] in schema [%s]",
tableName, getFullSchemaName()));
// Get stats table file (JSON) path for the given table name.
private Path getStatsTableFilePath(final String tableName) {
// Access stats file as DRILL process user (not impersonated user)
final Path tablePath = new Path(config.getLocation(), tableName);
try {
Path stFPath = null;
if (dpsFs.isDirectory(tablePath)) {
stFPath = new Path(tablePath, STATS.getEnding()+ Path.SEPARATOR + "0_0.json");
if (dpsFs.isFile(stFPath)) {
return stFPath;
return stFPath;
} catch (final Exception e) {
throw new DrillRuntimeException(
String.format("Failed to find the the stats for table [%s] in schema [%s]",
tableName, getFullSchemaName()));
public boolean isMutable() {
return config.isWritable();
public DrillFileSystem getFS() {
return fs;
public String getDefaultLocation() {
return config.getLocation();
public CreateTableEntry createNewTable(String tableName, List<String> partitionColumns, StorageStrategy storageStrategy) {
String storage = schemaConfig.getOption(ExecConstants.OUTPUT_FORMAT_OPTION).string_val;
FormatPlugin formatPlugin = plugin.getFormatPlugin(storage);
return createOrAppendToTable(tableName, formatPlugin, partitionColumns, storageStrategy);
public CreateTableEntry createStatsTable(String tableName) {
final String statsTableName = getStatsTableName(tableName);
FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
return createOrAppendToTable(statsTableName, formatPlugin, Collections.emptyList(),
public CreateTableEntry appendToStatsTable(String tableName) {
final String statsTableName = getStatsTableName(tableName);
FormatPlugin formatPlugin = plugin.getFormatPlugin(JSONFormatPlugin.DEFAULT_NAME);
return createOrAppendToTable(statsTableName, formatPlugin, Collections.emptyList(),
public Table getStatsTable(String tableName) {
return getTable(getStatsTableName(tableName));
private CreateTableEntry createOrAppendToTable(String tableName, FormatPlugin formatPlugin,
List<String> partitionColumns, StorageStrategy storageStrategy) {
if (formatPlugin == null) {
throw new UnsupportedOperationException(
String.format("Unsupported format '%s' in workspace '%s'", config.getDefaultInputFormat(),
return new FileSystemCreateTableEntry(
(FileSystemConfig) plugin.getConfig(),
config.getLocation() + Path.SEPARATOR + tableName,
public String getTypeName() {
return FileSystemConfig.NAME;
public DrillTable create(TableInstance key) {
try {
final FileSelection fileSelection = FileSelection.create(getFS(), config.getLocation(), key.sig.getName(), config.allowAccessOutsideWorkspace());
if (fileSelection == null) {
return null;
boolean hasDirectories = fileSelection.containsDirectories(getFS());
if (key.sig.getParams().size() > 0) {
FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories);
if (newSelection.isEmptyDirectory()) {
return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
FormatPluginConfig formatConfig = optionExtractor.createConfigForTable(key);
FormatSelection selection = new FormatSelection(formatConfig, newSelection);
DrillTable drillTable = new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), selection);
List<TableParamDef> commonParams = key.sig.getCommonParams();
if (commonParams.isEmpty()) {
return drillTable;
// extract only common parameters related values
List<Object> paramValues = key.params.subList(key.params.size() - commonParams.size(), key.params.size());
return applyFunctionParameters(drillTable, commonParams, paramValues);
if (hasDirectories) {
for (final FormatMatcher matcher : dirMatchers) {
try {
DrillTable table = matcher.isReadable(getFS(), fileSelection, plugin, storageEngineName, schemaConfig);
if (table != null) {
return table;
} catch (IOException e) {
logger.debug("File read failed.", e);
FileSelection newSelection = detectEmptySelection(fileSelection, hasDirectories);
if (newSelection.isEmptyDirectory()) {
return new DynamicDrillTable(plugin, storageEngineName, schemaConfig.getUserName(), fileSelection);
for (final FormatMatcher matcher : fileMatchers) {
DrillTable table = matcher.isReadable(getFS(), newSelection, plugin, storageEngineName, schemaConfig);
if (table != null) {
return table;
return null;
} catch (AccessControlException e) {
if (!schemaConfig.getIgnoreAuthErrors()) {
throw UserException.permissionError(e)
.message("Not authorized to read table [%s] in schema [%s]", key, getFullSchemaName())
} catch (IOException e) {
logger.debug("Failed to create DrillTable with root {} and name {}", config.getLocation(), key, e);
return null;
* Expands given file selection if it has directories.
* If expanded file selection is null (i.e. directory is empty), sets empty directory status to true.
* @param fileSelection file selection
* @param hasDirectories flag that indicates if given file selection has directories
* @return revisited file selection
private FileSelection detectEmptySelection(FileSelection fileSelection, boolean hasDirectories) throws IOException {
FileSelection newSelection = hasDirectories ? fileSelection.minusDirectories(getFS()) : fileSelection;
if (newSelection == null) {
// empty directory / selection means that this is the empty and schemaless table
return fileSelection;
return newSelection;
private FormatMatcher findMatcher(FileStatus file) {
try {
for (FormatMatcher m : dropFileMatchers) {
if (m.isFileReadable(getFS(), file)) {
return m;
} catch (IOException e) {
logger.debug("Failed to find format matcher for file: {}", file, e);
return null;
public void destroy(DrillTable value) {
* Check if the table contains homogeneous files that can be read by Drill. Eg: parquet, json csv etc.
* However if it contains more than one of these formats or a totally different file format that Drill cannot
* understand then we will raise an exception.
* @param tableName name of the table to be checked for homogeneous property
* @return true if table contains homogeneous files, false otherwise
* @throws IOException is case of problems accessing table files
private boolean isHomogeneous(String tableName) throws IOException {
FileSelection fileSelection = FileSelection.create(getFS(), config.getLocation(), tableName, config.allowAccessOutsideWorkspace());
if (fileSelection == null) {
throw UserException
.message(String.format("Table [%s] not found", tableName))
FormatMatcher matcher = null;
Queue<FileStatus> listOfFiles = new LinkedList<>(fileSelection.getStatuses(getFS()));
while (!listOfFiles.isEmpty()) {
FileStatus currentFile = listOfFiles.poll();
if (currentFile.isDirectory()) {
listOfFiles.addAll(DrillFileSystemUtil.listFiles(getFS(), currentFile.getPath(), true));
} else {
if (matcher != null) {
if (!matcher.isFileReadable(getFS(), currentFile)) {
return false;
} else {
matcher = findMatcher(currentFile);
// Did not match any of the file patterns, exit
if (matcher == null) {
return false;
return true;
* We check if the table contains homogeneous file formats that Drill can read. Once the checks are performed
* we rename the file to start with an "_". After the rename we issue a recursive delete of the directory.
* @param table - Path of table to be dropped
public void dropTable(String table) {
DrillFileSystem fs = getFS();
String defaultLocation = getDefaultLocation();
try {
if (!isHomogeneous(table)) {
throw UserException
.message("Table contains different file formats. \n" +
"Drop Table is only supported for directories that contain homogeneous file formats consumable by Drill")
StringBuilder tableRenameBuilder = new StringBuilder();
int lastSlashIndex = table.lastIndexOf(Path.SEPARATOR);
if (lastSlashIndex != -1) {
tableRenameBuilder.append(table, 0, lastSlashIndex + 1);
// Generate unique identifier which will be added as a suffix to the table name
ThreadLocalRandom r = ThreadLocalRandom.current();
long time = (System.currentTimeMillis()/1000);
long p1 = ((Integer.MAX_VALUE - time) << 32) + r.nextInt();
long p2 = r.nextLong();
final String fileNameDelimiter = DrillFileSystem.UNDERSCORE_PREFIX;
String[] pathSplit = table.split(Path.SEPARATOR);
* Builds the string for the renamed table
* Prefixes the table name with an underscore (intent for this to be treated as a hidden file)
* and suffixes the table name with unique identifiers (similar to how we generate query id's)
* separated by underscores
.append(pathSplit[pathSplit.length - 1])
String tableRename = tableRenameBuilder.toString();
fs.rename(new Path(defaultLocation, table), new Path(defaultLocation, tableRename));
fs.delete(new Path(defaultLocation, tableRename), true);
} catch (AccessControlException e) {
throw UserException
.message("Unauthorized to drop table")
} catch (IOException e) {
throw UserException
.message("Failed to drop table: " + e.getMessage())
public List<Map.Entry<String, TableType>> getTableNamesAndTypes() {
return Stream.concat(
tables.entrySet().stream().map(kv -> Pair.of(kv.getKey().sig.getName(), kv.getValue().getJdbcTableType())),
getViews().stream().map(viewName -> Pair.of(viewName, TableType.VIEW))