blob: 7c1d04f34584ba8bb6412a416e00eceee1be520c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.hadoop.testutil;
import java.io.BufferedWriter;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.UUID;
import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.compression.CompressorFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.index.IndexStoreManager;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.metadata.encoder.Encoding;
import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.DataLoadExecutor;
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.util.TableOptionConstant;
import com.google.gson.Gson;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.log4j.Logger;
/**
* This class will create store file based on provided schema
*
*/
public class StoreCreator {
private static final Logger LOG =
LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName());
private final AbsoluteTableIdentifier absoluteTableIdentifier;
private final String storePath;
private final String csvPath;
private List<String> sortColumns = new ArrayList<>();
public StoreCreator(String storePath, String csvPath) {
this.storePath = storePath;
this.csvPath = csvPath;
String dbName = "testdb";
String tableName = "testtable";
sortColumns.add("date");
sortColumns.add("country");
sortColumns.add("name");
sortColumns.add("phonetype");
sortColumns.add("serialname");
absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath + "/testdb/testtable",
new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
}
public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
return absoluteTableIdentifier;
}
public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath,
AbsoluteTableIdentifier absoluteTableIdentifier) {
CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table);
CarbonLoadModel loadModel = new CarbonLoadModel();
String columnCompressor = table.getTableInfo().getFactTable().getTableProperties().get(
CarbonCommonConstants.COMPRESSOR);
if (columnCompressor == null) {
columnCompressor = CompressorFactory.getInstance().getCompressor().getName();
}
loadModel.setColumnCompressor(columnCompressor);
loadModel.setCarbonDataLoadSchema(schema);
loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName());
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName());
loadModel.setFactFilePath(factFilePath);
loadModel.setLoadMetadataDetails(new ArrayList<>());
loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
loadModel.setDateFormat(null);
loadModel.setCarbonTransactionalTable(table.isTransactionalTable());
loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));
loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
loadModel
.setSerializationNullFormat(
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N");
loadModel
.setBadRecordsLoggerEnable(
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false");
loadModel
.setBadRecordsAction(
TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE");
loadModel
.setIsEmptyDataBadRecord(
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false");
loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary");
loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
loadModel.setTaskNo("0");
loadModel.setSegmentId("0");
loadModel.setFactTimeStamp(System.currentTimeMillis());
loadModel.setMaxColumns("10");
return loadModel;
}
/**
* Create store without any restructure
*/
public CarbonLoadModel createCarbonStore() throws Exception {
CarbonLoadModel loadModel = createTableAndLoadModel();
loadData(loadModel, storePath);
return loadModel;
}
/**
* Create store without any restructure
*/
public void createCarbonStore(CarbonLoadModel loadModel) throws Exception {
loadData(loadModel, storePath);
}
/**
* Method to clear the index
*/
public void clearIndexes() {
IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier);
}
public CarbonLoadModel createTableAndLoadModel(boolean deleteOldStore) throws Exception {
if (deleteOldStore) {
File storeDir = new File(storePath);
CarbonUtil.deleteFoldersAndFiles(storeDir);
}
CarbonTable table = createTable(absoluteTableIdentifier);
return buildCarbonLoadModel(table, csvPath, absoluteTableIdentifier);
}
public CarbonLoadModel createTableAndLoadModel() throws Exception {
return createTableAndLoadModel(true);
}
public CarbonTable createTable(
AbsoluteTableIdentifier identifier) throws IOException {
TableInfo tableInfo = new TableInfo();
tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
TableSchema tableSchema = new TableSchema();
tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName());
List<ColumnSchema> columnSchemas = new ArrayList<>();
ArrayList<Encoding> encodings = new ArrayList<>();
int schemaOrdinal = 0;
addColumn(columnSchemas, encodings, schemaOrdinal++, "id", DataTypes.INT, true);
addColumn(columnSchemas, encodings, schemaOrdinal++, "date", DataTypes.STRING, true);
addColumn(columnSchemas, encodings, schemaOrdinal++, "country", DataTypes.STRING, true);
addColumn(columnSchemas, encodings, schemaOrdinal++, "name", DataTypes.STRING, true);
addColumn(columnSchemas, encodings, schemaOrdinal++, "phonetype", DataTypes.STRING, true);
addColumn(columnSchemas, encodings, schemaOrdinal++, "serialname", DataTypes.STRING, true);
addColumn(columnSchemas, encodings, schemaOrdinal, "salary", DataTypes.INT, false);
// rearrange the column schema based on the sort order, if sort columns exists
List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas);
tableSchema.setListOfColumns(columnSchemas1);
SchemaEvolution schemaEvolution = new SchemaEvolution();
schemaEvolution.setSchemaEvolutionEntryList(new ArrayList<>());
tableSchema.setSchemaEvolution(schemaEvolution);
tableSchema.setTableId(UUID.randomUUID().toString());
tableInfo.setTableUniqueName(
identifier.getCarbonTableIdentifier().getTableUniqueName()
);
tableInfo.setLastUpdatedTime(System.currentTimeMillis());
tableInfo.setFactTable(tableSchema);
tableInfo.setTablePath(identifier.getTablePath());
String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath());
String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
org.apache.carbondata.format.TableInfo thriftTableInfo =
schemaConverter.fromWrapperToExternalTableInfo(
tableInfo,
tableInfo.getDatabaseName(),
tableInfo.getFactTable().getTableName());
org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());
thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history()
.add(schemaEvolutionEntry);
if (!FileFactory.isFileExist(schemaMetadataPath)) {
FileFactory.mkdirs(schemaMetadataPath);
}
ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false);
thriftWriter.open();
thriftWriter.write(thriftTableInfo);
thriftWriter.close();
return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName());
}
private void addColumn(List<ColumnSchema> columnSchemas, ArrayList<Encoding> encodings,
int schemaOrdinal, String name2, DataType dataType, boolean isDimensionColumn) {
ColumnSchema name = new ColumnSchema();
name.setColumnName(name2);
name.setDataType(dataType);
name.setEncodingList(encodings);
name.setColumnUniqueId(UUID.randomUUID().toString());
name.setColumnReferenceId(name.getColumnUniqueId());
name.setDimensionColumn(isDimensionColumn);
name.setSchemaOrdinal(schemaOrdinal);
if (sortColumns.contains(name.getColumnName())) {
name.setSortColumn(true);
}
columnSchemas.add(name);
}
private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) {
List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size());
// add sort columns first
for (ColumnSchema columnSchema : columnSchemas) {
if (columnSchema.isSortColumn()) {
newColumnSchema.add(columnSchema);
}
}
// add other dimension columns
for (ColumnSchema columnSchema : columnSchemas) {
if (!columnSchema.isSortColumn() && columnSchema.isDimensionColumn()) {
newColumnSchema.add(columnSchema);
}
}
// add measure columns
for (ColumnSchema columnSchema : columnSchemas) {
if (!columnSchema.isDimensionColumn()) {
newColumnSchema.add(columnSchema);
}
}
return newColumnSchema;
}
public void setSortColumns(List<String> sortColumns) {
this.sortColumns = sortColumns;
}
/**
* Execute graph which will further load data
*/
public static void loadData(CarbonLoadModel loadModel, String storeLocation)
throws Exception {
if (new File(storeLocation).mkdirs()) {
LOG.warn("mkdir is failed");
}
String outPutLoc = storeLocation + "/etl";
String databaseName = loadModel.getDatabaseName();
String tableName = loadModel.getTableName();
String tempLocationKey = databaseName + '_' + tableName + "_1";
CarbonProperties.getInstance().addProperty(
tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName);
CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
CarbonProperties.getInstance().addProperty("send.signal.load", "false");
CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true");
CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1");
CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true");
CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true");
CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true");
CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false");
String graphPath =
outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName
+ File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr";
File path = new File(graphPath);
if (path.exists()) {
if (!path.delete()) {
LOG.warn("delete " + path + " failed");
}
}
BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()),
0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"});
Configuration configuration = new Configuration();
CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar());
CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter());
CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar());
CSVInputFormat.setHeaderExtractionEnabled(configuration, true);
CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar());
CSVInputFormat.setReadBufferSize(configuration,
CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.CSV_READ_BUFFER_SIZE,
CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
CSVInputFormat.setNumberOfColumns(
configuration, String.valueOf(loadModel.getCsvHeaderColumns().length));
CSVInputFormat.setMaxColumns(configuration, "10");
CSVInputFormat.setLineSeparator(configuration, loadModel.getLineSeparator());
TaskAttemptContextImpl hadoopAttemptContext =
new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0));
CSVInputFormat format = new CSVInputFormat();
RecordReader<NullWritable, StringArrayWritable> recordReader =
format.createRecordReader(blockDetails, hadoopAttemptContext);
CSVRecordReaderIterator readerIterator =
new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext);
DataTypeUtil.clearFormatter();
new DataLoadExecutor().execute(loadModel,
new String[] {storeLocation + "/" + databaseName + "/" + tableName},
new CarbonIterator[]{readerIterator});
writeLoadMetadata(
loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(),
new ArrayList<>());
}
public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
loadMetadataDetails.setLoadName(String.valueOf(0));
loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
listOfLoadFolderDetails.add(loadMetadataDetails);
String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator
+ CarbonTablePath.TABLE_STATUS_FILE;
DataOutputStream dataOutputStream;
Gson gsonObjectToWrite = new Gson();
BufferedWriter brWriter = null;
AtomicFileOperations writeOperation =
AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation);
try {
dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
brWriter.write(metadataInstance);
} catch (IOException ioe) {
LOG.error("Error message: " + ioe.getLocalizedMessage());
writeOperation.setFailed();
throw ioe;
} finally {
if (null != brWriter) {
brWriter.flush();
}
CarbonUtil.closeStreams(brWriter);
}
writeOperation.close();
}
public static String readCurrentTime() {
SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS);
return sdf.format(new Date());
}
public static void main(String[] args) throws Exception {
new StoreCreator(new File("target/store").getAbsolutePath(),
new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createCarbonStore();
}
}