| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.processing.store; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; |
| import org.apache.carbondata.core.datastore.compression.SnappyCompressor; |
| import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; |
| import org.apache.carbondata.core.datastore.row.CarbonRow; |
| import org.apache.carbondata.core.datastore.row.WriteStepRowUtil; |
| import org.apache.carbondata.core.keygenerator.KeyGenException; |
| import org.apache.carbondata.core.keygenerator.columnar.impl.MultiDimKeyVarLengthEquiSplitGenerator; |
| import org.apache.carbondata.core.memory.MemoryException; |
| import org.apache.carbondata.core.metadata.ColumnarFormatVersion; |
| import org.apache.carbondata.core.metadata.datatype.DataType; |
| import org.apache.carbondata.core.metadata.datatype.DataTypes; |
| import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.CarbonThreadFactory; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.processing.datatypes.GenericDataType; |
| import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter; |
| |
| import org.apache.log4j.Logger; |
| |
| /** |
| * Fact data handler class to handle the fact data |
| */ |
| public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { |
| |
| /** |
| * LOGGER |
| */ |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(CarbonFactDataHandlerColumnar.class.getName()); |
| |
| private CarbonFactDataHandlerModel model; |
| |
| /** |
| * data writer |
| */ |
| private CarbonFactDataWriter dataWriter; |
| |
| /** |
| * total number of entries in blocklet |
| */ |
| private int entryCount; |
| |
| /** |
| * blocklet size (for V1 and V2) or page size (for V3). A Producer thread will start to process |
| * once this size of input is reached |
| */ |
| private int pageSize; |
| |
| private long processedDataCount; |
| private ExecutorService producerExecutorService; |
| private List<Future<Void>> producerExecutorServiceTaskList; |
| private ExecutorService consumerExecutorService; |
| private List<Future<Void>> consumerExecutorServiceTaskList; |
| private List<CarbonRow> dataRows; |
| private int[] noDictColumnPageSize; |
| /** |
| * semaphore which will used for managing node holder objects |
| */ |
| private Semaphore semaphore; |
| /** |
| * counter that incremented for every job submitted to data writer thread |
| */ |
| private int writerTaskSequenceCounter; |
| /** |
| * a private class that will hold the data for blocklets |
| */ |
| private TablePageList tablePageList; |
| /** |
| * number of cores configured |
| */ |
| private int numberOfCores; |
| /** |
| * integer that will be incremented for every new blocklet submitted to producer for processing |
| * the data and decremented every time consumer fetches the blocklet for writing |
| */ |
| private AtomicInteger blockletProcessingCount; |
| /** |
| * flag to check whether all blocklets have been finished writing |
| */ |
| private boolean processingComplete; |
| |
| /** |
| * current data format version |
| */ |
| private ColumnarFormatVersion version; |
| |
| /* |
| * cannot use the indexMap of model directly, |
| * modifying map in model directly will create problem if accessed later, |
| * Hence take a copy and work on it. |
| * */ |
| private Map<Integer, GenericDataType> complexIndexMapCopy = null; |
| |
| /* configured page size in MB*/ |
| private int configuredPageSizeInBytes = 0; |
| |
| /** |
| * CarbonFactDataHandler constructor |
| */ |
| public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel model) { |
| this.model = model; |
| initParameters(model); |
| this.version = CarbonProperties.getInstance().getFormatVersion(); |
| StringBuffer noInvertedIdxCol = new StringBuffer(); |
| for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) { |
| if (!cd.isUseInvertedIndex()) { |
| noInvertedIdxCol.append(cd.getColName()).append(","); |
| } |
| } |
| |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Columns considered as NoInverted Index are " + noInvertedIdxCol.toString()); |
| } |
| this.complexIndexMapCopy = new HashMap<>(); |
| for (Map.Entry<Integer, GenericDataType> entry: model.getComplexIndexMap().entrySet()) { |
| this.complexIndexMapCopy.put(entry.getKey(), entry.getValue().deepCopy()); |
| } |
| String pageSizeStrInBytes = |
| model.getTableSpec().getCarbonTable().getTableInfo().getFactTable().getTableProperties() |
| .get(CarbonCommonConstants.TABLE_PAGE_SIZE_INMB); |
| if (pageSizeStrInBytes != null) { |
| configuredPageSizeInBytes = Integer.parseInt(pageSizeStrInBytes) * 1024 * 1024; |
| } |
| } |
| |
| private void initParameters(CarbonFactDataHandlerModel model) { |
| this.numberOfCores = model.getNumberOfCores(); |
| blockletProcessingCount = new AtomicInteger(0); |
| producerExecutorService = Executors.newFixedThreadPool(model.getNumberOfCores(), |
| new CarbonThreadFactory( |
| String.format("ProducerPool:%s, range: %d", |
| model.getTableName(), model.getBucketId()), true)); |
| producerExecutorServiceTaskList = |
| new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); |
| LOGGER.debug("Initializing writer executors"); |
| consumerExecutorService = Executors.newFixedThreadPool(1, new CarbonThreadFactory( |
| String.format("ConsumerPool:%s, range: %d", |
| model.getTableName(), model.getBucketId()), true)); |
| consumerExecutorServiceTaskList = new ArrayList<>(1); |
| semaphore = new Semaphore(numberOfCores); |
| tablePageList = new TablePageList(); |
| |
| // Start the consumer which will take each blocklet/page in order and write to a file |
| Consumer consumer = new Consumer(tablePageList); |
| consumerExecutorServiceTaskList.add(consumerExecutorService.submit(consumer)); |
| } |
| |
| private void setComplexMapSurrogateIndex(int dimensionCount) { |
| int surrIndex = 0; |
| for (int i = 0; i < dimensionCount; i++) { |
| GenericDataType complexDataType = model.getComplexIndexMap().get(i); |
| if (complexDataType != null) { |
| List<GenericDataType> primitiveTypes = new ArrayList<GenericDataType>(); |
| complexDataType.getAllPrimitiveChildren(primitiveTypes); |
| for (GenericDataType eachPrimitive : primitiveTypes) { |
| if (eachPrimitive.getIsColumnDictionary()) { |
| eachPrimitive.setSurrogateIndex(surrIndex++); |
| } |
| } |
| } else { |
| surrIndex++; |
| } |
| } |
| } |
| |
| /** |
| * This method will be used to get and update the step properties which will |
| * required to run this step |
| * |
| * @throws CarbonDataWriterException |
| */ |
| public void initialise() throws CarbonDataWriterException { |
| setWritingConfiguration(); |
| } |
| |
| /** |
| * below method will be used to add row to store |
| * |
| * @param row |
| * @throws CarbonDataWriterException |
| */ |
| public void addDataToStore(CarbonRow row) throws CarbonDataWriterException { |
| int totalComplexColumnDepth = setFlatCarbonRowForComplex(row); |
| if (noDictColumnPageSize == null) { |
| // initialization using first row. |
| model.setNoDictAllComplexColumnDepth(totalComplexColumnDepth); |
| if (model.getNoDictDataTypesList().size() + model.getNoDictAllComplexColumnDepth() > 0) { |
| noDictColumnPageSize = |
| new int[model.getNoDictDataTypesList().size() + model.getNoDictAllComplexColumnDepth()]; |
| } |
| } |
| |
| dataRows.add(row); |
| this.entryCount++; |
| // if entry count reaches to leaf node size then we are ready to write |
| // this to leaf node file and update the intermediate files |
| if (this.entryCount == this.pageSize || needToCutThePage(row)) { |
| try { |
| semaphore.acquire(); |
| |
| producerExecutorServiceTaskList.add( |
| producerExecutorService.submit( |
| new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, false) |
| ) |
| ); |
| blockletProcessingCount.incrementAndGet(); |
| // set the entry count to zero |
| processedDataCount += entryCount; |
| |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Total Number Of records added to store: " + processedDataCount); |
| } |
| dataRows = new ArrayList<>(this.pageSize); |
| this.entryCount = 0; |
| // re-init the complexIndexMap |
| this.complexIndexMapCopy = new HashMap<>(); |
| for (Map.Entry<Integer, GenericDataType> entry : model.getComplexIndexMap().entrySet()) { |
| this.complexIndexMapCopy.put(entry.getKey(), entry.getValue().deepCopy()); |
| } |
| noDictColumnPageSize = |
| new int[model.getNoDictDataTypesList().size() + model.getNoDictAllComplexColumnDepth()]; |
| } catch (InterruptedException e) { |
| LOGGER.error(e.getMessage(), e); |
| throw new CarbonDataWriterException(e); |
| } |
| } |
| } |
| |
| /** |
| * Check if column page can be added more rows after adding this row to page. |
| * only few no-dictionary dimensions columns (string, varchar, |
| * complex columns) can grow huge in size. |
| * |
| * |
| * @param row carbonRow |
| * @return false if next rows can be added to same page. |
| * true if next rows cannot be added to same page |
| */ |
| private boolean needToCutThePage(CarbonRow row) { |
| List<DataType> noDictDataTypesList = model.getNoDictDataTypesList(); |
| int totalNoDictPageCount = noDictDataTypesList.size() + model.getNoDictAllComplexColumnDepth(); |
| if (totalNoDictPageCount > 0) { |
| int currentElementLength; |
| int bucketCounter = 0; |
| if (configuredPageSizeInBytes == 0) { |
| // no need to cut the page |
| // use default value |
| /*configuredPageSizeInBytes = |
| CarbonCommonConstants.TABLE_PAGE_SIZE_INMB_DEFAULT * 1024 * 1024;*/ |
| return false; |
| } |
| Object[] nonDictArray = WriteStepRowUtil.getNoDictAndComplexDimension(row); |
| for (int i = 0; i < noDictDataTypesList.size(); i++) { |
| DataType columnType = noDictDataTypesList.get(i); |
| if ((columnType == DataTypes.STRING) || (columnType == DataTypes.VARCHAR) || (columnType |
| == DataTypes.BINARY)) { |
| currentElementLength = ((byte[]) nonDictArray[i]).length; |
| noDictColumnPageSize[bucketCounter] += currentElementLength; |
| canSnappyHandleThisRow(noDictColumnPageSize[bucketCounter]); |
| // If current page size is more than configured page size, cut the page here. |
| if (noDictColumnPageSize[bucketCounter] + dataRows.size() * 4 |
| >= configuredPageSizeInBytes) { |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("cutting the page. Rows count in this page: " + dataRows.size()); |
| } |
| // re-init for next page |
| noDictColumnPageSize = new int[totalNoDictPageCount]; |
| return true; |
| } |
| bucketCounter++; |
| } else if (columnType.isComplexType()) { |
| // this is for depth of each complex column, model is having only total depth. |
| GenericDataType genericDataType = complexIndexMapCopy |
| .get(i - model.getNoDictionaryCount() + model.getPrimitiveDimLens().length); |
| int depth = genericDataType.getDepth(); |
| List<ArrayList<byte[]>> flatComplexColumnList = (List<ArrayList<byte[]>>) nonDictArray[i]; |
| for (int k = 0; k < depth; k++) { |
| ArrayList<byte[]> children = flatComplexColumnList.get(k); |
| // Add child element from inner list. |
| int complexElementSize = 0; |
| for (byte[] child : children) { |
| complexElementSize += child.length; |
| } |
| noDictColumnPageSize[bucketCounter] += complexElementSize; |
| canSnappyHandleThisRow(noDictColumnPageSize[bucketCounter]); |
| // If current page size is more than configured page size, cut the page here. |
| if (noDictColumnPageSize[bucketCounter] + dataRows.size() * 4 |
| >= configuredPageSizeInBytes) { |
| LOGGER.info("cutting the page. Rows count: " + dataRows.size()); |
| // re-init for next page |
| noDictColumnPageSize = new int[totalNoDictPageCount]; |
| return true; |
| } |
| bucketCounter++; |
| } |
| } |
| } |
| } |
| return false; |
| } |
| |
| private int setFlatCarbonRowForComplex(CarbonRow row) { |
| int noDictTotalComplexChildDepth = 0; |
| Object[] noDictAndComplexDimension = WriteStepRowUtil.getNoDictAndComplexDimension(row); |
| for (int i = 0; i < noDictAndComplexDimension.length; i++) { |
| // complex types starts after no dictionary dimensions |
| if (i >= model.getNoDictionaryCount() && (model.getTableSpec().getNoDictionaryDimensionSpec() |
| .get(i).getSchemaDataType().isComplexType())) { |
| // this is for depth of each complex column, model is having only total depth. |
| GenericDataType genericDataType = complexIndexMapCopy |
| .get(i - model.getNoDictionaryCount() + model.getPrimitiveDimLens().length); |
| int depth = genericDataType.getDepth(); |
| // initialize flatComplexColumnList |
| List<ArrayList<byte[]>> flatComplexColumnList = new ArrayList<>(depth); |
| for (int k = 0; k < depth; k++) { |
| flatComplexColumnList.add(new ArrayList<byte[]>()); |
| } |
| // flatten the complex byteArray as per depth |
| try { |
| ByteBuffer byteArrayInput = ByteBuffer.wrap((byte[])noDictAndComplexDimension[i]); |
| ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream(); |
| DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutput); |
| genericDataType.parseComplexValue(byteArrayInput, dataOutputStream, |
| model.getComplexDimensionKeyGenerator()); |
| genericDataType.getColumnarDataForComplexType(flatComplexColumnList, |
| ByteBuffer.wrap(byteArrayOutput.toByteArray())); |
| byteArrayOutput.close(); |
| } catch (IOException | KeyGenException e) { |
| throw new CarbonDataWriterException("Problem in splitting and writing complex data", e); |
| } |
| noDictTotalComplexChildDepth += flatComplexColumnList.size(); |
| // update the complex column data with the flat data |
| noDictAndComplexDimension[i] = flatComplexColumnList; |
| } |
| } |
| return noDictTotalComplexChildDepth; |
| } |
| |
| private void canSnappyHandleThisRow(int currentRowSize) { |
| if (currentRowSize > SnappyCompressor.MAX_BYTE_TO_COMPRESS) { |
| throw new RuntimeException(" page size: " + currentRowSize + " exceed snappy size: " |
| + SnappyCompressor.MAX_BYTE_TO_COMPRESS + " Bytes. Snappy cannot compress it "); |
| } |
| } |
| |
| /** |
| * generate the EncodedTablePage from the input rows (one page in case of V3 format) |
| */ |
| private TablePage processDataRows(List<CarbonRow> dataRows) |
| throws CarbonDataWriterException, KeyGenException, MemoryException, IOException { |
| if (dataRows.size() == 0) { |
| return new TablePage(model, 0); |
| } |
| TablePage tablePage = new TablePage(model, dataRows.size()); |
| int rowId = 0; |
| |
| // convert row to columnar data |
| for (CarbonRow row : dataRows) { |
| tablePage.addRow(rowId++, row); |
| } |
| |
| tablePage.encode(); |
| |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Number Of records processed: " + dataRows.size()); |
| } |
| return tablePage; |
| } |
| |
| /** |
| * below method will be used to finish the data handler |
| * |
| * @throws CarbonDataWriterException |
| */ |
| public void finish() throws CarbonDataWriterException { |
| // still some data is present in stores if entryCount is more |
| // than 0 |
| if (null == dataWriter) { |
| return; |
| } |
| if (producerExecutorService.isShutdown()) { |
| return; |
| } |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Started Finish Operation"); |
| } |
| try { |
| semaphore.acquire(); |
| producerExecutorServiceTaskList.add(producerExecutorService |
| .submit(new Producer(tablePageList, dataRows, ++writerTaskSequenceCounter, true))); |
| blockletProcessingCount.incrementAndGet(); |
| processedDataCount += entryCount; |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Total Number Of records added to store: " + processedDataCount); |
| } |
| closeWriterExecutionService(producerExecutorService); |
| processWriteTaskSubmitList(producerExecutorServiceTaskList); |
| processingComplete = true; |
| } catch (InterruptedException e) { |
| LOGGER.error(e.getMessage(), e); |
| throw new CarbonDataWriterException(e); |
| } |
| } |
| |
| /** |
| * This method will close writer execution service and get the node holders and |
| * add them to node holder list |
| * |
| * @param service the service to shutdown |
| * @throws CarbonDataWriterException |
| */ |
| private void closeWriterExecutionService(ExecutorService service) |
| throws CarbonDataWriterException { |
| try { |
| service.shutdown(); |
| service.awaitTermination(1, TimeUnit.DAYS); |
| } catch (InterruptedException e) { |
| LOGGER.error(e.getMessage(), e); |
| throw new CarbonDataWriterException(e); |
| } |
| } |
| |
| /** |
| * This method will iterate through future task list and check if any exception |
| * occurred during the thread execution |
| * |
| * @param taskList |
| * @throws CarbonDataWriterException |
| */ |
| private void processWriteTaskSubmitList(List<Future<Void>> taskList) |
| throws CarbonDataWriterException { |
| for (int i = 0; i < taskList.size(); i++) { |
| try { |
| taskList.get(i).get(); |
| } catch (InterruptedException | ExecutionException e) { |
| LOGGER.error(e.getMessage(), e); |
| throw new CarbonDataWriterException(e); |
| } |
| } |
| } |
| |
| // return the number of complex column after complex columns are expanded |
| private int getExpandedComplexColsCount() { |
| return model.getExpandedComplexColsCount(); |
| } |
| |
| /** |
| * below method will be used to close the handler |
| */ |
| public void closeHandler() throws CarbonDataWriterException { |
| if (null != this.dataWriter) { |
| // wait until all blocklets have been finished writing |
| while (blockletProcessingCount.get() > 0) { |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException e) { |
| throw new CarbonDataWriterException(e); |
| } |
| } |
| consumerExecutorService.shutdownNow(); |
| processWriteTaskSubmitList(consumerExecutorServiceTaskList); |
| this.dataWriter.writeFooter(); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("All blocklets have been finished writing"); |
| } |
| // close all the open stream for both the files |
| this.dataWriter.closeWriter(); |
| } |
| this.dataWriter = null; |
| } |
| |
| /** |
| * Below method will be to configure fact file writing configuration |
| * |
| * @throws CarbonDataWriterException |
| */ |
| private void setWritingConfiguration() throws CarbonDataWriterException { |
| // get blocklet size |
| this.pageSize = Integer.parseInt(CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.BLOCKLET_SIZE, |
| CarbonCommonConstants.BLOCKLET_SIZE_DEFAULT_VAL)); |
| // support less than 32000 rows in one page, because we support super long string, |
| // if it is long enough, a column page with 32000 rows will exceed 2GB |
| if (version == ColumnarFormatVersion.V3) { |
| this.pageSize = |
| pageSize < CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT ? |
| pageSize : |
| CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT; |
| } |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Number of rows per column page is configured as pageSize = " + pageSize); |
| } |
| dataRows = new ArrayList<>(this.pageSize); |
| int dimSet = |
| Integer.parseInt(CarbonCommonConstants.DIMENSION_SPLIT_VALUE_IN_COLUMNAR_DEFAULTVALUE); |
| // if at least one dimension is present then initialize column splitter otherwise null |
| int[] keyBlockSize = new int[getExpandedComplexColsCount()]; |
| |
| // agg type |
| List<Integer> otherMeasureIndexList = |
| new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); |
| List<Integer> customMeasureIndexList = |
| new ArrayList<Integer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); |
| DataType[] type = model.getMeasureDataType(); |
| for (int j = 0; j < type.length; j++) { |
| if (type[j] != DataTypes.BYTE && !DataTypes.isDecimal(type[j])) { |
| otherMeasureIndexList.add(j); |
| } else { |
| customMeasureIndexList.add(j); |
| } |
| } |
| |
| int[] otherMeasureIndex = new int[otherMeasureIndexList.size()]; |
| int[] customMeasureIndex = new int[customMeasureIndexList.size()]; |
| for (int i = 0; i < otherMeasureIndex.length; i++) { |
| otherMeasureIndex[i] = otherMeasureIndexList.get(i); |
| } |
| for (int i = 0; i < customMeasureIndex.length; i++) { |
| customMeasureIndex[i] = customMeasureIndexList.get(i); |
| } |
| setComplexMapSurrogateIndex(model.getDimensionCount()); |
| int[] blockKeySize = getBlockKeySizeWithComplexTypes(new MultiDimKeyVarLengthEquiSplitGenerator( |
| CarbonUtil.getIncrementedCardinalityFullyFilled(model.getDimLens().clone()), (byte) dimSet) |
| .getBlockKeySize()); |
| System.arraycopy(blockKeySize, 0, keyBlockSize, 0, blockKeySize.length); |
| this.dataWriter = getFactDataWriter(); |
| // initialize the channel; |
| this.dataWriter.initializeWriter(); |
| } |
| |
| /** |
| * This method combines primitive dimensions with complex metadata columns |
| * |
| * @param primitiveBlockKeySize |
| * @return all dimensions cardinality including complex dimension metadata column |
| */ |
| private int[] getBlockKeySizeWithComplexTypes(int[] primitiveBlockKeySize) { |
| int allColsCount = getExpandedComplexColsCount(); |
| int[] blockKeySizeWithComplexTypes = new int[allColsCount]; |
| |
| List<Integer> blockKeySizeWithComplex = |
| new ArrayList<Integer>(blockKeySizeWithComplexTypes.length); |
| int dictDimensionCount = model.getDimensionCount(); |
| for (int i = 0; i < dictDimensionCount; i++) { |
| GenericDataType complexDataType = model.getComplexIndexMap().get(i); |
| if (complexDataType != null) { |
| complexDataType.fillBlockKeySize(blockKeySizeWithComplex, primitiveBlockKeySize); |
| } else { |
| blockKeySizeWithComplex.add(primitiveBlockKeySize[i]); |
| } |
| } |
| for (int i = 0; i < blockKeySizeWithComplexTypes.length; i++) { |
| blockKeySizeWithComplexTypes[i] = blockKeySizeWithComplex.get(i); |
| } |
| |
| return blockKeySizeWithComplexTypes; |
| } |
| |
| /** |
| * Below method will be used to get the fact data writer instance |
| * |
| * @return data writer instance |
| */ |
| private CarbonFactDataWriter getFactDataWriter() { |
| return CarbonDataWriterFactory.getInstance().getFactDataWriter(version, model); |
| } |
| |
| /** |
| * This method will reset the block processing count |
| */ |
| private void resetBlockletProcessingCount() { |
| blockletProcessingCount.set(0); |
| } |
| |
| /** |
| * This class will hold the table page data |
| */ |
| private final class TablePageList { |
| /** |
| * array of table page added by Producer and get by Consumer |
| */ |
| private TablePage[] tablePages; |
| /** |
| * flag to check whether the producer has completed processing for holder |
| * object which is required to be picked form an index |
| */ |
| private AtomicBoolean available; |
| /** |
| * index from which data node holder object needs to be picked for writing |
| */ |
| private int currentIndex; |
| |
| private TablePageList() { |
| tablePages = new TablePage[numberOfCores]; |
| available = new AtomicBoolean(false); |
| } |
| |
| /** |
| * @return a node holder object |
| * @throws InterruptedException if consumer thread is interrupted |
| */ |
| public synchronized TablePage get() throws InterruptedException { |
| TablePage tablePage = tablePages[currentIndex]; |
| // if node holder is null means producer thread processing the data which has to |
| // be inserted at this current index has not completed yet |
| if (null == tablePage && !processingComplete) { |
| available.set(false); |
| } |
| while (!available.get()) { |
| wait(); |
| } |
| tablePage = tablePages[currentIndex]; |
| tablePages[currentIndex] = null; |
| currentIndex++; |
| // reset current index when it reaches length of node holder array |
| if (currentIndex >= tablePages.length) { |
| currentIndex = 0; |
| } |
| return tablePage; |
| } |
| |
| /** |
| * @param tablePage |
| * @param index |
| */ |
| public synchronized void put(TablePage tablePage, int index) { |
| tablePages[index] = tablePage; |
| // notify the consumer thread when index at which object is to be inserted |
| // becomes equal to current index from where data has to be picked for writing |
| if (index == currentIndex) { |
| available.set(true); |
| notifyAll(); |
| } |
| } |
| } |
| |
| /** |
| * Producer which will process data equivalent to 1 blocklet size |
| */ |
| private final class Producer implements Callable<Void> { |
| |
| private TablePageList tablePageList; |
| private List<CarbonRow> dataRows; |
| private int pageId; |
| private boolean isLastPage; |
| |
| private Producer(TablePageList tablePageList, List<CarbonRow> dataRows, |
| int pageId, boolean isLastPage) { |
| this.tablePageList = tablePageList; |
| this.dataRows = dataRows; |
| this.pageId = pageId; |
| this.isLastPage = isLastPage; |
| } |
| |
| /** |
| * Computes a result, or throws an exception if unable to do so. |
| * |
| * @return computed result |
| * @throws Exception if unable to compute a result |
| */ |
| @Override |
| public Void call() throws Exception { |
| try { |
| TablePage tablePage = processDataRows(dataRows); |
| dataRows = null; |
| tablePage.setIsLastPage(isLastPage); |
| // insert the object in array according to sequence number |
| int indexInNodeHolderArray = (pageId - 1) % numberOfCores; |
| tablePageList.put(tablePage, indexInNodeHolderArray); |
| return null; |
| } catch (Throwable throwable) { |
| LOGGER.error("Error in producer", throwable); |
| consumerExecutorService.shutdownNow(); |
| resetBlockletProcessingCount(); |
| throw new CarbonDataWriterException(throwable.getMessage(), throwable); |
| } |
| } |
| } |
| |
| /** |
| * Consumer class will get one blocklet data at a time and submit for writing |
| */ |
| private final class Consumer implements Callable<Void> { |
| |
| private TablePageList tablePageList; |
| |
| private Consumer(TablePageList tablePageList) { |
| this.tablePageList = tablePageList; |
| } |
| |
| /** |
| * Computes a result, or throws an exception if unable to do so. |
| * |
| * @return computed result |
| * @throws Exception if unable to compute a result |
| */ |
| @Override |
| public Void call() throws Exception { |
| while (!processingComplete || blockletProcessingCount.get() > 0) { |
| TablePage tablePage = null; |
| try { |
| tablePage = tablePageList.get(); |
| if (null != tablePage) { |
| dataWriter.writeTablePage(tablePage); |
| tablePage.freeMemory(); |
| } |
| blockletProcessingCount.decrementAndGet(); |
| } catch (Throwable throwable) { |
| if (!processingComplete || blockletProcessingCount.get() > 0) { |
| producerExecutorService.shutdownNow(); |
| resetBlockletProcessingCount(); |
| LOGGER.error("Problem while writing the carbon data file", throwable); |
| throw new CarbonDataWriterException(throwable); |
| } |
| } finally { |
| semaphore.release(); |
| } |
| } |
| return null; |
| } |
| } |
| } |