blob: 10fa4cf067bf1032a1c3cd9df79321e15df2a5bf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.sdk.file;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.FileReader;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.datatype.Field;
import org.apache.carbondata.core.metadata.datatype.StructField;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.reader.CarbonFooterReaderV3;
import org.apache.carbondata.core.reader.CarbonHeaderReader;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.format.FileFooter3;
import org.apache.carbondata.format.IndexHeader;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.sdk.file.arrow.ArrowConverter;
import org.apache.hadoop.conf.Configuration;
import static org.apache.carbondata.core.util.CarbonUtil.thriftColumnSchemaToWrapperColumnSchema;
import static org.apache.carbondata.core.util.path.CarbonTablePath.CARBON_DATA_EXT;
import static org.apache.carbondata.core.util.path.CarbonTablePath.INDEX_FILE_EXT;
/**
* Schema reader for carbon files, including carbondata file, carbonindex file, and schema file
*/
public class CarbonSchemaReader {
/**
* Read schema file and return the schema
*
* @param schemaFilePath complete path including schema file name
* @return schema object
* @throws IOException
*/
@Deprecated
public static Schema readSchemaInSchemaFile(String schemaFilePath) throws IOException {
org.apache.carbondata.format.TableInfo tableInfo = CarbonUtil.readSchemaFile(schemaFilePath);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
List<ColumnSchema> schemaList = schemaConverter
.fromExternalToWrapperTableInfo(tableInfo, "", "", "")
.getFactTable()
.getListOfColumns();
return new Schema(schemaList);
}
/**
* get carbondata/carbonindex file in path
*
* @param path carbon file path
* @param extension carbon file extension
* @param conf hadoop configuration support, can set s3a AK,SK,
* end point and other conf with this
* @return CarbonFile array
*/
private static CarbonFile[] getCarbonFile(String path, final String extension, Configuration conf)
throws IOException {
String dataFilePath = path;
if (!(dataFilePath.endsWith(extension))) {
CarbonFile[] carbonFiles = FileFactory
.getCarbonFile(path, conf)
.listFiles(new CarbonFileFilter() {
@Override
public boolean accept(CarbonFile file) {
if (file == null) {
return false;
}
return file.getName().endsWith(extension);
}
});
if (carbonFiles == null || carbonFiles.length < 1) {
throw new IOException("Carbon file not exists.");
}
return carbonFiles;
} else {
throw new CarbonDataLoadingException("Please ensure path "
+ path + " end with " + extension);
}
}
/**
* read schema from path,
* path can be folder path, carbonindex file path, and carbondata file path
* and will not check all files schema
*
* @param path file/folder path
* @return schema
* @throws IOException
*/
public static Schema readSchema(String path) throws IOException {
Configuration conf = new Configuration();
return readSchema(path, false, conf);
}
/**
* Converting carbon schema to arrow schema in byte[],
* byte[] can be converted back arrowSchema by other arrow interface module like pyspark etc.
*
* @param path
* @return
* @throws IOException
*/
public static byte[] getArrowSchemaAsBytes(String path) throws IOException {
Schema schema = CarbonSchemaReader.readSchema(path).asOriginOrder();
ArrowConverter arrowConverter = new ArrowConverter(schema, 0);
final byte[] bytes = arrowConverter.toSerializeArray();
return bytes;
}
/**
* read schema from path,
* path can be folder path, carbonindex file path, and carbondata file path
* and will not check all files schema
*
* @param path file/folder path
* @param conf hadoop configuration support, can set s3a AK,SK,end point and other conf with this
* @return schema
* @throws IOException
*/
public static Schema readSchema(String path, Configuration conf) throws IOException {
return readSchema(path, false, conf);
}
/**
* read schema from path,
* path can be folder path, carbonindex file path, and carbondata file path
* and user can decide whether check all files schema
*
* @param path file/folder path
* @param validateSchema whether check all files schema
* @param conf hadoop configuration support, can set s3a AK,SK,
* end point and other conf with this
* @return schema
* @throws IOException
*/
public static Schema readSchema(String path, boolean validateSchema, Configuration conf)
throws IOException {
// Check whether it is transactional table reads the schema
String schemaFilePath = CarbonTablePath.getSchemaFilePath(path, conf);
if (FileFactory.getCarbonFile(schemaFilePath, conf).exists()) {
return readSchemaInSchemaFile(schemaFilePath, conf);
}
if (path.endsWith(INDEX_FILE_EXT)) {
return readSchemaFromIndexFile(path, conf);
} else if (path.endsWith(CARBON_DATA_EXT)) {
return readSchemaFromDataFile(path, conf);
} else if (validateSchema) {
CarbonFile[] carbonIndexFiles = getCarbonFile(path, INDEX_FILE_EXT, conf);
Schema schema;
if (carbonIndexFiles != null && carbonIndexFiles.length != 0) {
schema = readSchemaFromIndexFile(carbonIndexFiles[0].getAbsolutePath(), conf);
for (int i = 1; i < carbonIndexFiles.length; i++) {
Schema schema2 = readSchemaFromIndexFile(carbonIndexFiles[i].getAbsolutePath(), conf);
if (!schema.equals(schema2)) {
throw new CarbonDataLoadingException("Schema is different between different files.");
}
}
CarbonFile[] carbonDataFiles = getCarbonFile(path, CARBON_DATA_EXT, conf);
for (int i = 0; i < carbonDataFiles.length; i++) {
Schema schema2 = readSchemaFromDataFile(carbonDataFiles[i].getAbsolutePath(), conf);
if (!schema.equals(schema2)) {
throw new CarbonDataLoadingException("Schema is different between different files.");
}
}
return schema;
} else {
throw new CarbonDataLoadingException("No carbonindex file in this path.");
}
} else {
return readSchemaFromFolder(path, conf);
}
}
private static Schema readSchemaInSchemaFile(String schemaFilePath, Configuration conf)
throws IOException {
org.apache.carbondata.format.TableInfo tableInfo =
CarbonUtil.readSchemaFile(schemaFilePath, conf);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
TableSchema factTable =
schemaConverter.fromExternalToWrapperTableInfo(tableInfo, "", "", "").getFactTable();
List<ColumnSchema> schemaList = factTable.getListOfColumns();
return new Schema(schemaList, factTable.getTableProperties());
}
/**
* read schema from path,
* path can be folder path, carbonindex file path, and carbondata file path
* and user can decide whether check all files schema
*
* @param path file/folder path
* @param validateSchema whether check all files schema
* @return schema
* @throws IOException
*/
public static Schema readSchema(String path, boolean validateSchema) throws IOException {
Configuration conf = new Configuration();
return readSchema(path, validateSchema, conf);
}
/**
* Read carbondata file and return the schema
* This interface will be removed,
* please use readSchema instead of this interface
*
* @param dataFilePath carbondata file store path
* @return Schema object
* @throws IOException
*/
@Deprecated
public static Schema readSchemaInDataFile(String dataFilePath) throws IOException {
return readSchema(dataFilePath, false);
}
/**
* Read schema from carbondata file
*
* @param dataFilePath carbondata file path
* @param conf hadoop configuration support, can set s3a AK,SK,
* end point and other conf with this
* @return carbon data schema
* @throws IOException
*/
private static Schema readSchemaFromDataFile(String dataFilePath, Configuration conf)
throws IOException {
CarbonHeaderReader reader = new CarbonHeaderReader(dataFilePath, conf);
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
List<ColumnSchema> schemaList = reader.readSchema();
for (int i = 0; i < schemaList.size(); i++) {
ColumnSchema columnSchema = schemaList.get(i);
if (!(columnSchema.isComplexColumn())) {
columnSchemaList.add(columnSchema);
}
}
return new Schema(columnSchemaList);
}
/**
* Read carbonindex file and return the schema
* This interface will be removed,
* please use readSchema instead of this interface
*
* @param indexFilePath complete path including index file name
* @return schema object
* @throws IOException
*/
@Deprecated
public static Schema readSchemaInIndexFile(String indexFilePath) throws IOException {
return readSchema(indexFilePath, false);
}
public static List<StructField> getChildrenCommon(CarbonTable table, String columnName) {
List<CarbonDimension> list = table.getChildren(columnName);
List<StructField> structFields = new ArrayList<StructField>();
for (int i = 0; i < list.size(); i++) {
CarbonDimension carbonDimension = list.get(i);
if (DataTypes.isStructType(carbonDimension.getDataType())) {
structFields.add(getStructChildren(table, carbonDimension.getColName()));
return structFields;
} else if (DataTypes.isArrayType(carbonDimension.getDataType())) {
structFields.add(getArrayChildren(table, carbonDimension.getColName()));
return structFields;
} else if (DataTypes.isMapType(carbonDimension.getDataType())) {
//TODO
} else {
ColumnSchema columnSchema = carbonDimension.getColumnSchema();
structFields.add(new StructField(columnSchema.getColumnName(), columnSchema.getDataType()));
}
}
return structFields;
}
public static StructField getStructChildren(CarbonTable table, String columnName) {
List<StructField> structFields = getChildrenCommon(table, columnName);
return new StructField(columnName, DataTypes.createStructType(structFields));
}
public static StructField getArrayChildren(CarbonTable table, String columnName) {
List<StructField> structFields = getChildrenCommon(table, columnName);
return structFields.get(0);
}
/**
* Read schema from carbon file folder path
*
* @param folderPath carbon file folder path
* @param conf hadoop configuration support, can set s3a AK,SK,
* end point and other conf with this
* @return carbon data Schema
* @throws IOException
*/
private static Schema readSchemaFromFolder(String folderPath, Configuration conf)
throws IOException {
String tableName = "UnknownTable" + UUID.randomUUID();
CarbonTable table = CarbonTable.buildTable(folderPath, tableName, conf);
List<ColumnSchema> columnSchemaList = table.getTableInfo().getFactTable().getListOfColumns();
int numOfChildren = 0;
for (ColumnSchema columnSchema : columnSchemaList) {
if (!(columnSchema.getColumnName().contains(CarbonCommonConstants.POINT))) {
numOfChildren++;
}
}
Field[] fields = new Field[numOfChildren];
int indexOfFields = 0;
for (ColumnSchema columnSchema : columnSchemaList) {
if (!columnSchema.getColumnName().contains(CarbonCommonConstants.POINT)) {
if (DataTypes.isStructType(columnSchema.getDataType())) {
StructField structField = getStructChildren(table, columnSchema.getColumnName());
List<StructField> list = new ArrayList<>();
list.add(structField);
fields[indexOfFields] = new Field(columnSchema.getColumnName(),
DataTypes.createStructType(list));
fields[indexOfFields].setSchemaOrdinal(columnSchema.getSchemaOrdinal());
indexOfFields++;
} else if (DataTypes.isArrayType(columnSchema.getDataType())) {
StructField structField = getArrayChildren(table, columnSchema.getColumnName());
List<StructField> list = new ArrayList<>();
list.add(structField);
fields[indexOfFields] = new Field(columnSchema.getColumnName(), "array", list);
fields[indexOfFields].setSchemaOrdinal(columnSchema.getSchemaOrdinal());
indexOfFields++;
} else if (DataTypes.isMapType(columnSchema.getDataType())) {
//TODO
} else {
fields[indexOfFields] = new Field(columnSchema);
fields[indexOfFields].setSchemaOrdinal(columnSchema.getSchemaOrdinal());
indexOfFields++;
}
}
}
return new Schema(fields);
}
/**
* Read schema from carbonindex file
*
* @param indexFilePath carbonindex file path
* @param conf hadoop configuration support, can set s3a AK,SK,
* end point and other conf with this
* @return carbon data Schema
* @throws IOException
*/
private static Schema readSchemaFromIndexFile(String indexFilePath, Configuration conf) {
IndexHeader readIndexHeader = SegmentIndexFileStore.readIndexHeader(indexFilePath, conf);
List<ColumnSchema> columnSchemaList = new ArrayList<ColumnSchema>();
List<org.apache.carbondata.format.ColumnSchema> table_columns =
readIndexHeader.getTable_columns();
for (org.apache.carbondata.format.ColumnSchema columnSchema : table_columns) {
if (!(columnSchema.column_name.contains("."))) {
columnSchemaList.add(thriftColumnSchemaToWrapperColumnSchema(columnSchema));
}
}
return new Schema(columnSchemaList);
}
/**
* This method return the version details in formatted string by reading from carbondata file
*
* @param dataFilePath
* @return
* @throws IOException
*/
public static String getVersionDetails(String dataFilePath) throws IOException {
long fileSize =
FileFactory.getCarbonFile(dataFilePath).getSize();
FileReader fileReader = FileFactory.getFileHolder(FileFactory.getFileType(dataFilePath));
ByteBuffer buffer =
fileReader.readByteBuffer(FileFactory.getUpdatedFilePath(dataFilePath), fileSize - 8, 8);
fileReader.finish();
CarbonFooterReaderV3 footerReader =
new CarbonFooterReaderV3(dataFilePath, buffer.getLong());
FileFooter3 footer = footerReader.readFooterVersion3();
if (null != footer.getExtra_info()) {
return footer.getExtra_info().get(CarbonCommonConstants.CARBON_WRITTEN_BY_FOOTER_INFO)
+ " in version: " + footer.getExtra_info()
.get(CarbonCommonConstants.CARBON_WRITTEN_VERSION);
} else {
return "Version Details are not found in carbondata file";
}
}
}