| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.hadoop.testutil; |
| |
| import java.io.BufferedWriter; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.OutputStreamWriter; |
| import java.nio.charset.Charset; |
| import java.text.SimpleDateFormat; |
| import java.util.ArrayList; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.UUID; |
| |
| import org.apache.carbondata.common.CarbonIterator; |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.datastore.compression.CompressorFactory; |
| import org.apache.carbondata.core.datastore.impl.FileFactory; |
| import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory; |
| import org.apache.carbondata.core.fileoperations.AtomicFileOperations; |
| import org.apache.carbondata.core.fileoperations.FileWriteOperation; |
| import org.apache.carbondata.core.index.IndexStoreManager; |
| import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; |
| import org.apache.carbondata.core.metadata.CarbonMetadata; |
| import org.apache.carbondata.core.metadata.CarbonTableIdentifier; |
| import org.apache.carbondata.core.metadata.converter.SchemaConverter; |
| import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl; |
| import org.apache.carbondata.core.metadata.datatype.DataType; |
| import org.apache.carbondata.core.metadata.datatype.DataTypes; |
| import org.apache.carbondata.core.metadata.encoder.Encoding; |
| import org.apache.carbondata.core.metadata.schema.SchemaEvolution; |
| import org.apache.carbondata.core.metadata.schema.table.CarbonTable; |
| import org.apache.carbondata.core.metadata.schema.table.TableInfo; |
| import org.apache.carbondata.core.metadata.schema.table.TableSchema; |
| import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; |
| import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; |
| import org.apache.carbondata.core.statusmanager.SegmentStatus; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.core.util.DataTypeUtil; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| import org.apache.carbondata.core.writer.ThriftWriter; |
| import org.apache.carbondata.processing.loading.DataLoadExecutor; |
| import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants; |
| import org.apache.carbondata.processing.loading.csvinput.BlockDetails; |
| import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat; |
| import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator; |
| import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; |
| import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema; |
| import org.apache.carbondata.processing.loading.model.CarbonLoadModel; |
| import org.apache.carbondata.processing.util.TableOptionConstant; |
| |
| import com.google.gson.Gson; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.mapred.TaskAttemptID; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.TaskType; |
| import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; |
| import org.apache.log4j.Logger; |
| |
| /** |
| * This class will create store file based on provided schema |
| * |
| */ |
| public class StoreCreator { |
| |
| private static final Logger LOG = |
| LogServiceFactory.getLogService(StoreCreator.class.getCanonicalName()); |
| private final AbsoluteTableIdentifier absoluteTableIdentifier; |
| private final String storePath; |
| private final String csvPath; |
| private List<String> sortColumns = new ArrayList<>(); |
| |
| public StoreCreator(String storePath, String csvPath) { |
| this.storePath = storePath; |
| this.csvPath = csvPath; |
| String dbName = "testdb"; |
| String tableName = "testtable"; |
| sortColumns.add("date"); |
| sortColumns.add("country"); |
| sortColumns.add("name"); |
| sortColumns.add("phonetype"); |
| sortColumns.add("serialname"); |
| absoluteTableIdentifier = AbsoluteTableIdentifier.from(storePath + "/testdb/testtable", |
| new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); |
| } |
| |
| public AbsoluteTableIdentifier getAbsoluteTableIdentifier() { |
| return absoluteTableIdentifier; |
| } |
| |
| public static CarbonLoadModel buildCarbonLoadModel(CarbonTable table, String factFilePath, |
| AbsoluteTableIdentifier absoluteTableIdentifier) { |
| CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); |
| CarbonLoadModel loadModel = new CarbonLoadModel(); |
| String columnCompressor = table.getTableInfo().getFactTable().getTableProperties().get( |
| CarbonCommonConstants.COMPRESSOR); |
| if (columnCompressor == null) { |
| columnCompressor = CompressorFactory.getInstance().getCompressor().getName(); |
| } |
| loadModel.setColumnCompressor(columnCompressor); |
| loadModel.setCarbonDataLoadSchema(schema); |
| loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); |
| loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); |
| loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); |
| loadModel.setFactFilePath(factFilePath); |
| loadModel.setLoadMetadataDetails(new ArrayList<>()); |
| loadModel.setTablePath(absoluteTableIdentifier.getTablePath()); |
| loadModel.setDateFormat(null); |
| loadModel.setCarbonTransactionalTable(table.isTransactionalTable()); |
| loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, |
| CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS)); |
| loadModel.setDefaultDateFormat(CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.CARBON_DATE_FORMAT, |
| CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)); |
| loadModel |
| .setSerializationNullFormat( |
| TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," + "\\N"); |
| loadModel |
| .setBadRecordsLoggerEnable( |
| TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + "false"); |
| loadModel |
| .setBadRecordsAction( |
| TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + "FORCE"); |
| loadModel |
| .setIsEmptyDataBadRecord( |
| DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," + "false"); |
| loadModel.setCsvHeader("ID,date,country,name,phonetype,serialname,salary"); |
| loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); |
| loadModel.setTaskNo("0"); |
| loadModel.setSegmentId("0"); |
| loadModel.setFactTimeStamp(System.currentTimeMillis()); |
| loadModel.setMaxColumns("10"); |
| return loadModel; |
| } |
| |
| /** |
| * Create store without any restructure |
| */ |
| public CarbonLoadModel createCarbonStore() throws Exception { |
| CarbonLoadModel loadModel = createTableAndLoadModel(); |
| loadData(loadModel, storePath); |
| return loadModel; |
| } |
| |
| /** |
| * Create store without any restructure |
| */ |
| public void createCarbonStore(CarbonLoadModel loadModel) throws Exception { |
| loadData(loadModel, storePath); |
| } |
| |
| /** |
| * Method to clear the index |
| */ |
| public void clearIndexes() { |
| IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier); |
| } |
| |
| public CarbonLoadModel createTableAndLoadModel(boolean deleteOldStore) throws Exception { |
| if (deleteOldStore) { |
| File storeDir = new File(storePath); |
| CarbonUtil.deleteFoldersAndFiles(storeDir); |
| } |
| |
| CarbonTable table = createTable(absoluteTableIdentifier); |
| return buildCarbonLoadModel(table, csvPath, absoluteTableIdentifier); |
| } |
| |
| public CarbonLoadModel createTableAndLoadModel() throws Exception { |
| return createTableAndLoadModel(true); |
| } |
| |
| public CarbonTable createTable( |
| AbsoluteTableIdentifier identifier) throws IOException { |
| TableInfo tableInfo = new TableInfo(); |
| tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); |
| TableSchema tableSchema = new TableSchema(); |
| tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName()); |
| List<ColumnSchema> columnSchemas = new ArrayList<>(); |
| ArrayList<Encoding> encodings = new ArrayList<>(); |
| int schemaOrdinal = 0; |
| addColumn(columnSchemas, encodings, schemaOrdinal++, "id", DataTypes.INT, true); |
| addColumn(columnSchemas, encodings, schemaOrdinal++, "date", DataTypes.STRING, true); |
| addColumn(columnSchemas, encodings, schemaOrdinal++, "country", DataTypes.STRING, true); |
| addColumn(columnSchemas, encodings, schemaOrdinal++, "name", DataTypes.STRING, true); |
| addColumn(columnSchemas, encodings, schemaOrdinal++, "phonetype", DataTypes.STRING, true); |
| addColumn(columnSchemas, encodings, schemaOrdinal++, "serialname", DataTypes.STRING, true); |
| addColumn(columnSchemas, encodings, schemaOrdinal, "salary", DataTypes.INT, false); |
| // rearrange the column schema based on the sort order, if sort columns exists |
| List<ColumnSchema> columnSchemas1 = reArrangeColumnSchema(columnSchemas); |
| tableSchema.setListOfColumns(columnSchemas1); |
| SchemaEvolution schemaEvolution = new SchemaEvolution(); |
| schemaEvolution.setSchemaEvolutionEntryList(new ArrayList<>()); |
| tableSchema.setSchemaEvolution(schemaEvolution); |
| tableSchema.setTableId(UUID.randomUUID().toString()); |
| tableInfo.setTableUniqueName( |
| identifier.getCarbonTableIdentifier().getTableUniqueName() |
| ); |
| tableInfo.setLastUpdatedTime(System.currentTimeMillis()); |
| tableInfo.setFactTable(tableSchema); |
| tableInfo.setTablePath(identifier.getTablePath()); |
| String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); |
| String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); |
| CarbonMetadata.getInstance().loadTableMetadata(tableInfo); |
| |
| SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl(); |
| org.apache.carbondata.format.TableInfo thriftTableInfo = |
| schemaConverter.fromWrapperToExternalTableInfo( |
| tableInfo, |
| tableInfo.getDatabaseName(), |
| tableInfo.getFactTable().getTableName()); |
| org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry = |
| new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime()); |
| thriftTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history() |
| .add(schemaEvolutionEntry); |
| |
| if (!FileFactory.isFileExist(schemaMetadataPath)) { |
| FileFactory.mkdirs(schemaMetadataPath); |
| } |
| |
| ThriftWriter thriftWriter = new ThriftWriter(schemaFilePath, false); |
| thriftWriter.open(); |
| thriftWriter.write(thriftTableInfo); |
| thriftWriter.close(); |
| return CarbonMetadata.getInstance().getCarbonTable(tableInfo.getTableUniqueName()); |
| } |
| |
| private void addColumn(List<ColumnSchema> columnSchemas, ArrayList<Encoding> encodings, |
| int schemaOrdinal, String name2, DataType dataType, boolean isDimensionColumn) { |
| ColumnSchema name = new ColumnSchema(); |
| name.setColumnName(name2); |
| name.setDataType(dataType); |
| name.setEncodingList(encodings); |
| name.setColumnUniqueId(UUID.randomUUID().toString()); |
| name.setColumnReferenceId(name.getColumnUniqueId()); |
| name.setDimensionColumn(isDimensionColumn); |
| name.setSchemaOrdinal(schemaOrdinal); |
| if (sortColumns.contains(name.getColumnName())) { |
| name.setSortColumn(true); |
| } |
| columnSchemas.add(name); |
| } |
| |
| private List<ColumnSchema> reArrangeColumnSchema(List<ColumnSchema> columnSchemas) { |
| List<ColumnSchema> newColumnSchema = new ArrayList<>(columnSchemas.size()); |
| // add sort columns first |
| for (ColumnSchema columnSchema : columnSchemas) { |
| if (columnSchema.isSortColumn()) { |
| newColumnSchema.add(columnSchema); |
| } |
| } |
| // add other dimension columns |
| for (ColumnSchema columnSchema : columnSchemas) { |
| if (!columnSchema.isSortColumn() && columnSchema.isDimensionColumn()) { |
| newColumnSchema.add(columnSchema); |
| } |
| } |
| // add measure columns |
| for (ColumnSchema columnSchema : columnSchemas) { |
| if (!columnSchema.isDimensionColumn()) { |
| newColumnSchema.add(columnSchema); |
| } |
| } |
| return newColumnSchema; |
| } |
| |
| public void setSortColumns(List<String> sortColumns) { |
| this.sortColumns = sortColumns; |
| } |
| |
| /** |
| * Execute graph which will further load data |
| */ |
| public static void loadData(CarbonLoadModel loadModel, String storeLocation) |
| throws Exception { |
| if (new File(storeLocation).mkdirs()) { |
| LOG.warn("mkdir is failed"); |
| } |
| String outPutLoc = storeLocation + "/etl"; |
| String databaseName = loadModel.getDatabaseName(); |
| String tableName = loadModel.getTableName(); |
| String tempLocationKey = databaseName + '_' + tableName + "_1"; |
| CarbonProperties.getInstance().addProperty( |
| tempLocationKey, storeLocation + "/" + databaseName + "/" + tableName); |
| CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc); |
| CarbonProperties.getInstance().addProperty("send.signal.load", "false"); |
| CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true"); |
| CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1"); |
| CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true"); |
| CarbonProperties.getInstance().addProperty("is.int.based.indexer", "true"); |
| CarbonProperties.getInstance().addProperty("aggregate.columnar.keyblock", "true"); |
| CarbonProperties.getInstance().addProperty("is.compressed.keyblock", "false"); |
| |
| String graphPath = |
| outPutLoc + File.separator + loadModel.getDatabaseName() + File.separator + tableName |
| + File.separator + 0 + File.separator + 1 + File.separator + tableName + ".ktr"; |
| File path = new File(graphPath); |
| if (path.exists()) { |
| if (!path.delete()) { |
| LOG.warn("delete " + path + " failed"); |
| } |
| } |
| |
| BlockDetails blockDetails = new BlockDetails(new Path(loadModel.getFactFilePath()), |
| 0, new File(loadModel.getFactFilePath()).length(), new String[] {"localhost"}); |
| Configuration configuration = new Configuration(); |
| CSVInputFormat.setCommentCharacter(configuration, loadModel.getCommentChar()); |
| CSVInputFormat.setCSVDelimiter(configuration, loadModel.getCsvDelimiter()); |
| CSVInputFormat.setEscapeCharacter(configuration, loadModel.getEscapeChar()); |
| CSVInputFormat.setHeaderExtractionEnabled(configuration, true); |
| CSVInputFormat.setQuoteCharacter(configuration, loadModel.getQuoteChar()); |
| CSVInputFormat.setReadBufferSize(configuration, |
| CarbonProperties.getInstance().getProperty( |
| CarbonCommonConstants.CSV_READ_BUFFER_SIZE, |
| CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT)); |
| CSVInputFormat.setNumberOfColumns( |
| configuration, String.valueOf(loadModel.getCsvHeaderColumns().length)); |
| CSVInputFormat.setMaxColumns(configuration, "10"); |
| CSVInputFormat.setLineSeparator(configuration, loadModel.getLineSeparator()); |
| |
| TaskAttemptContextImpl hadoopAttemptContext = |
| new TaskAttemptContextImpl(configuration, new TaskAttemptID("", 1, TaskType.MAP, 0, 0)); |
| CSVInputFormat format = new CSVInputFormat(); |
| |
| RecordReader<NullWritable, StringArrayWritable> recordReader = |
| format.createRecordReader(blockDetails, hadoopAttemptContext); |
| |
| CSVRecordReaderIterator readerIterator = |
| new CSVRecordReaderIterator(recordReader, blockDetails, hadoopAttemptContext); |
| DataTypeUtil.clearFormatter(); |
| new DataLoadExecutor().execute(loadModel, |
| new String[] {storeLocation + "/" + databaseName + "/" + tableName}, |
| new CarbonIterator[]{readerIterator}); |
| |
| writeLoadMetadata( |
| loadModel.getCarbonDataLoadSchema(), loadModel.getTableName(), loadModel.getTableName(), |
| new ArrayList<>()); |
| } |
| |
| public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName, |
| String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { |
| LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); |
| loadMetadataDetails.setLoadEndTime(System.currentTimeMillis()); |
| loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS); |
| loadMetadataDetails.setLoadName(String.valueOf(0)); |
| loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime())); |
| listOfLoadFolderDetails.add(loadMetadataDetails); |
| |
| String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator |
| + CarbonTablePath.TABLE_STATUS_FILE; |
| |
| DataOutputStream dataOutputStream; |
| Gson gsonObjectToWrite = new Gson(); |
| BufferedWriter brWriter = null; |
| |
| AtomicFileOperations writeOperation = |
| AtomicFileOperationFactory.getAtomicFileOperations(dataLoadLocation); |
| |
| try { |
| |
| dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); |
| brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, |
| Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); |
| |
| String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); |
| brWriter.write(metadataInstance); |
| } catch (IOException ioe) { |
| LOG.error("Error message: " + ioe.getLocalizedMessage()); |
| writeOperation.setFailed(); |
| throw ioe; |
| } finally { |
| if (null != brWriter) { |
| brWriter.flush(); |
| } |
| CarbonUtil.closeStreams(brWriter); |
| |
| } |
| writeOperation.close(); |
| |
| } |
| |
| public static String readCurrentTime() { |
| SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS); |
| return sdf.format(new Date()); |
| } |
| |
| public static void main(String[] args) throws Exception { |
| new StoreCreator(new File("target/store").getAbsolutePath(), |
| new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createCarbonStore(); |
| } |
| |
| } |