blob: d704fd828b62c9d42115f6b80d8313493f844b2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.record.metadata.schema;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.drill.common.util.JacksonUtils;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
/**
* Is used to provide schema using given schema file name and path.
*/
public class PathSchemaProvider implements SchemaProvider {
/**
* Reader used to read JSON schema from file into into {@link SchemaContainer}.
* Allows comment inside the JSON file.
*/
public static final ObjectReader READER;
/**
* Writer used to write content from {@link SchemaContainer} into JSON file.
*/
public static final ObjectWriter WRITER;
static {
ObjectMapper mapper = JacksonUtils.createObjectMapper().enable(INDENT_OUTPUT)
.configure(JsonParser.Feature.ALLOW_COMMENTS, true);
READER = mapper.readerFor(SchemaContainer.class);
DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter();
prettyPrinter = prettyPrinter.withArrayIndenter(DefaultIndenter.SYSTEM_LINEFEED_INSTANCE);
WRITER = mapper.writer(prettyPrinter);
}
private final Path path;
private final FileSystem fs;
public PathSchemaProvider(Path path) throws IOException {
this(createFsFromPath(path), path);
}
public PathSchemaProvider(FileSystem fs, Path path) throws IOException {
this.fs = ImpersonationUtil.createFileSystem(ImpersonationUtil.getProcessUserName(), fs.getConf());
if (!fs.exists(path.getParent())) {
throw new IOException(String.format("Parent path for schema file [%s] does not exist", path.toUri().getPath()));
}
this.path = path;
}
private static FileSystem createFsFromPath(Path path) throws IOException {
return path.getFileSystem(new Configuration());
}
@Override
public void delete() throws IOException {
try {
if (!fs.delete(path, false)) {
throw new IOException(String.format("Error while deleting schema file [%s]", path.toUri().getPath()));
}
} catch (IOException e1) {
// re-check file existence to cover concurrent deletion case
try {
if (exists()) {
throw e1;
}
} catch (IOException e2) {
// ignore new exception and throw original one
throw e1;
}
}
}
@Override
public void store(String schema, Map<String, String> properties, StorageProperties storageProperties) throws IOException {
SchemaContainer tableSchema = createTableSchema(schema, properties);
try (OutputStream stream = fs.create(path, storageProperties.isOverwrite())) {
WRITER.writeValue(stream, tableSchema);
}
storageProperties.getStorageStrategy().applyToFile(fs, path);
}
@Override
public SchemaContainer read() throws IOException {
try (InputStream stream = fs.open(path)) {
return READER.readValue(stream);
}
}
@Override
public boolean exists() throws IOException {
return fs.exists(path);
}
protected SchemaContainer createTableSchema(String schema, Map<String, String> properties) throws IOException {
return new SchemaContainer(null, schema, properties);
}
}