blob: 4b5248f6531e91cc50228e338bd20324681ecc9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.testutil;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datamap.DataMapStoreManager;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.DataLoadExecutor;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.util.TableOptionConstant;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.log4j.Logger;
/**
* This class will create store file based on provided schema
*
*/
public class StoreCreator {
private static final Logger LOG =
LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName());
private AbsoluteTableIdentifier absoluteTableIdentifier;
private String storePath = null;
private String csvPath;
private List<String> sortColumns = new ArrayList<>();
public StoreCreator(String storePath, String csvPath) {
this.storePath = storePath;
this.csvPath = csvPath;
String dbName = "testdb";
String tableName = "testtable";
sortColumns.add("date");
sortColumns.add("country");
sortColumns.add("name");
sortColumns.add("phonetype");
sortColumns.add("serialname");
absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath + "/testdb/testtable",
new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
}
public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
return absoluteTableIdentifier;
}
public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
AbsoluteTableIdentifier absoluteTableIdentifier) {
CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
CarbonLoadModel loadModel = new CarbonLoadModel();
String columnCompressor = table.getTableInfo().getFactTable().getTableProperties().get(
CarbonCommonConstants.COMPRESSOR);
if (columnCompressor == null) {
columnCompressor = CompressorFactory.getInstance().getCompressor().getName();
}
loadModel.setColumnCompressor(columnCompressor);
loadModel.setCarbonDataLoadSchema(schema);
loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setFactFilePath(factFilePath);
loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
loadModel.setDateFormat(null);
loadModel.setCarbonTransactionalTable(table.isTransactionalTable());
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
loadModel
.setSerializationNullFormat(
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
loadModel
.setBadRecordsLoggerEnable(
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
loadModel
.setBadRecordsAction(
TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
loadModel
.setIsEmptyDataBadRecord(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
loadModel.setTaskNo("0");
loadModel.setSegmentId("0");
loadModel.setFactTimeStamp(System.currentTimeMillis());
loadModel.setMaxColumns("10");
return loadModel;
}
/**
* Create store without any restructure
*/
public CarbonLoadModel createCarbonStore() throws Exception {
CarbonLoadModel loadModel = createTableAndLoadModel();
loadData(loadModel, storePath);
return loadModel;
}
/**
* Create store without any restructure
*/
public void createCarbonStore(CarbonLoadModel loadModel) throws Exception {
loadData(loadModel, storePath);
}
/**
* Method to clear the data maps
*/
public void clearDataMaps() throws IOException {
DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier);
}
public CarbonLoadModel createTableAndLoadModel(boolean deleteOldStore) throws Exception {
if (deleteOldStore) {
File storeDir = new File(storePath);
CarbonUtil.deleteFoldersAndFiles(storeDir);
}
CarbonTable table = createTable(absoluteTableIdentifier);
return buildCarbonLoadModel(table, csvPath, absoluteTableIdentifier);
}
public CarbonLoadModel createTableAndLoadModel() throws Exception {
return createTableAndLoadModel(true);
}
public CarbonTable createTable(
AbsoluteTableIdentifier identifier) throws IOException {
TableInfo tableInfo = new TableInfo();
tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>();
ArrayList<Encoding> encodings = new ArrayList<>();
int schemaOrdinal = 0;
ColumnSchema id = new ColumnSchema();
id.setColumnName("id");
id.setDataType(DataTypes.INT);
id.setEncodingList(encodings);
id.setColumnUniqueId(UUID.randomUUID().toString());
id.setColumnReferenceId(id.getColumnUniqueId());
id.setDimensionColumn(true);
id.setSchemaOrdinal(schemaOrdinal++);
if (sortColumns.contains(id.getColumnName())) {
id.setSortColumn(true);
}
columnSchemas.add(id);
ColumnSchema date = new ColumnSchema();
date.setColumnName("date");
date.setDataType(DataTypes.STRING);
date.setEncodingList(encodings);
date.setColumnUniqueId(UUID.randomUUID().toString());
date.setDimensionColumn(true);
date.setColumnReferenceId(date.getColumnUniqueId());
date.setSchemaOrdinal(schemaOrdinal++);
if (sortColumns.contains(date.getColumnName())) {
date.setSortColumn(true);
}
columnSchemas.add(date);
ColumnSchema country = new ColumnSchema();
country.setColumnName("country");
country.setDataType(DataTypes.STRING);
country.setEncodingList(encodings);
country.setColumnUniqueId(UUID.randomUUID().toString());
country.setDimensionColumn(true);
country.setSortColumn(true);
country.setSchemaOrdinal(schemaOrdinal++);
if (sortColumns.contains(country.getColumnName())) {
country.setSortColumn(true);
}
country.setColumnReferenceId(country.getColumnUniqueId());
columnSchemas.add(country);
ColumnSchema name = new ColumnSchema();
name.setColumnName("name");
name.setDataType(DataTypes.STRING);
name.setEncodingList(encodings);
name.setColumnUniqueId(UUID.randomUUID().toString());
name.setDimensionColumn(true);
name.setSchemaOrdinal(schemaOrdinal++);
if (sortColumns.contains(name.getColumnName())) {
name.setSortColumn(true);
}
name.setColumnReferenceId(name.getColumnUniqueId());
columnSchemas.add(name);
ColumnSchema phonetype = new ColumnSchema();
phonetype.setColumnName("phonetype");
phonetype.setDataType(DataTypes.STRING);
phonetype.setEncodingList(encodings);
phonetype.setColumnUniqueId(UUID.randomUUID().toString());
phonetype.setDimensionColumn(true);
phonetype.setSchemaOrdinal(schemaOrdinal++);
if (sortColumns.contains(phonetype.getColumnName())) {
phonetype.setSortColumn(true);
}
phonetype.setColumnReferenceId(phonetype.getColumnUniqueId());
columnSchemas.add(phonetype);
ColumnSchema serialname = new ColumnSchema();
serialname.setColumnName("serialname");
serialname.setDataType(DataTypes.STRING);
serialname.setEncodingList(encodings);
serialname.setColumnUniqueId(UUID.randomUUID().toString());
serialname.setDimensionColumn(true);
serialname.setSchemaOrdinal(schemaOrdinal++);
if (sortColumns.contains(serialname.getColumnName())) {
serialname.setSortColumn(true);
}
serialname.setColumnReferenceId(serialname.getColumnUniqueId());
columnSchemas.add(serialname);
ColumnSchema salary = new ColumnSchema();
salary.setColumnName("salary");
salary.setDataType(DataTypes.INT);
salary.setEncodingList(new ArrayList<Encoding>());
salary.setColumnUniqueId(UUID.randomUUID().toString());
salary.setDimensionColumn(false);
salary.setColumnReferenceId(salary.getColumnUniqueId());
salary.setSchemaOrdinal(schemaOrdinal++);
columnSchemas.add(salary);
// rearrange the column schema based on the sort order, if sort columns exists
List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas);
tableSchema.setListOfColumns(columnSchemas1);
SchemaEvolution schemaEvol = new SchemaEvolution();
schemaEvol.setSchemaEvolutionEntryList(new ArrayList<SchemaEvolutionEntry>());
tableSchema.setSchemaEvolution(schemaEvol);
tableSchema.setTableId(UUID.randomUUID().toString());
tableInfo.setTableUniqueName(
identifier.getCarbonTableIdentifier().getTableUniqueName()
);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
tableInfo.setTablePath(identifier.getTablePath());
String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
org.apache.carbondata.format.TableInfo thriftTableInfo =
schemaConverter.fromWrapperToExternalTableInfo(
tableInfo,
tableInfo.getDatabaseName(),
tableInfo.getFactTable().getTableName());
org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
.add(schemaEvolutionEntry);
if (!FileFactory.isFileExist(schemaMetadataPath)) {
FileFactory.mkdirs(schemaMetadataPath);
}
ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
thriftWriter.open();
thriftWriter.write(thriftTableInfo);
thriftWriter.close();
return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
}
private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) {
List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size());
// add sort columns first
for (ColumnSchema columnSchema : columnSchemas) {
if (columnSchema.isSortColumn()) {
newColumnSchema.add(columnSchema);
}
}
// add other dimension columns
for (ColumnSchema columnSchema : columnSchemas) {
if (!columnSchema.isSortColumn() && columnSchema.isDimensionColumn()) {
newColumnSchema.add(columnSchema);
}
}
// add measure columns
for (ColumnSchema columnSchema : columnSchemas) {
if (!columnSchema.isDimensionColumn()) {
newColumnSchema.add(columnSchema);
}
}
return newColumnSchema;
}
public void setSortColumns(List<String> sortColumns) {
this.sortColumns = sortColumns;
}
/**
* Execute graph which will further load data
*
* @param loadModel
* @param storeLocation
* @throws Exception
*/
public static void loadData(CarbonLoadModel loadModel, String storeLocation)
throws Exception {
if (new File(storeLocation).mkdirs()) {
LOG.warn("mkdir is failed");
}
String outPutLoc = storeLocation + "/etl";
String databaseName = loadModel.getDatabaseName();
String tableName = loadModel.getTableName();
String tempLocationKey = databaseName + '_' + tableName + "_1";
CarbonProperties.getInstance().addProperty(
tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
CarbonProperties.getInstance().addProperty("send.signal.load", "false");
CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1");
CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true");
CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true");
CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true");
CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
String graphPath =
outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
+ File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
File path = new File(graphPath);
if (path.exists()) {
if (!path.delete()) {
LOG.warn("delete " + path + " failed");
}
}
BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
Configuration configuration = new Configuration();
CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar());
CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter());
CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar());
CSVInputFormat.setHeaderExtractionEnabled(configuration, true);
CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar());
CSVInputFormat.setReadBufferSize(configuration,
CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
CSVInputFormat.setNumberOfColumns(
configuration, String.valueOf(loadModel.getCsvHeaderColumns().length));
CSVInputFormat.setMaxColumns(configuration, "10");
TaskAttemptContextImpl hadoopAttemptContext =
new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
CSVInputFormat format = new CSVInputFormat();
RecordReader<NullWritable, StringArrayWritable> recordReader =
format.createRecordReader(blockDetails, hadoopAttemptContext);
CSVRecordReaderIterator readerIterator =
new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
DataTypeUtil.clearFormatter();
new DataLoadExecutor().execute(loadModel,
new String[] {storeLocation + "/" + databaseName + "/" + tableName},
new CarbonIterator[]{readerIterator});
writeLoadMetadata(
loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
new ArrayList<LoadMetadataDetails>());
}
public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
loadMetadataDetails.setLoadName(String.valueOf(0));
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
listOfLoadFolderDetails.add(loadMetadataDetails);
String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
+ CarbonTablePath.TABLE_STATUS_FILE;
DataOutputStream dataOutputStream;
Gson gsonObjectToWrite = new Gson();
BufferedWriter brWriter = null;
AtomicFileOperations writeOperation =
AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
try {
dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
brWriter.write(metadataInstance);
} catch (IOException ioe) {
LOG.error("Error message: " + ioe.getLocalizedMessage());
writeOperation.setFailed();
throw ioe;
} finally {
try {
if (null != brWriter) {
brWriter.flush();
}
} catch (Exception e) {
throw e;
}
CarbonUtil.closeStreams(brWriter);
}
writeOperation.close();
}
public static String readCurrentTime() {
SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
String date = null;
date = sdf.format(new Date());
return date;
}
public static void main(String[] args) throws Exception {
new StoreCreator(new File("target/store").getAbsolutePath(),
new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createCarbonStore();
}
}