| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.processing.loading.sort.unsafe; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.DataOutputStream; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.Random; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.constants.CarbonCommonConstants; |
| import org.apache.carbondata.core.datastore.impl.FileFactory; |
| import org.apache.carbondata.core.memory.CarbonUnsafe; |
| import org.apache.carbondata.core.memory.IntPointerBuffer; |
| import org.apache.carbondata.core.memory.MemoryBlock; |
| import org.apache.carbondata.core.memory.MemoryException; |
| import org.apache.carbondata.core.memory.UnsafeMemoryManager; |
| import org.apache.carbondata.core.memory.UnsafeSortMemoryManager; |
| import org.apache.carbondata.core.util.CarbonProperties; |
| import org.apache.carbondata.core.util.CarbonThreadFactory; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.core.util.ReUsableByteArrayDataOutputStream; |
| import org.apache.carbondata.core.util.ThreadLocalTaskInfo; |
| import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparator; |
| import org.apache.carbondata.processing.loading.sort.unsafe.comparator.UnsafeRowComparatorForNormalDims; |
| import org.apache.carbondata.processing.loading.sort.unsafe.holder.UnsafeCarbonRow; |
| import org.apache.carbondata.processing.loading.sort.unsafe.merger.UnsafeIntermediateMerger; |
| import org.apache.carbondata.processing.loading.sort.unsafe.sort.TimSort; |
| import org.apache.carbondata.processing.loading.sort.unsafe.sort.UnsafeIntSortDataFormat; |
| import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException; |
| import org.apache.carbondata.processing.sort.sortdata.SortParameters; |
| import org.apache.carbondata.processing.sort.sortdata.TableFieldStat; |
| import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; |
| |
| import org.apache.log4j.Logger; |
| |
| public class UnsafeSortDataRows { |
| /** |
| * LOGGER |
| */ |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(UnsafeSortDataRows.class.getName()); |
| /** |
| * threadStatusObserver |
| */ |
| private ThreadStatusObserver threadStatusObserver; |
| /** |
| * executor service for data sort holder |
| */ |
| private ExecutorService dataSorterAndWriterExecutorService; |
| /** |
| * semaphore which will used for managing sorted data object arrays |
| */ |
| |
| private SortParameters parameters; |
| private TableFieldStat tableFieldStat; |
| private ThreadLocal<ReUsableByteArrayDataOutputStream> reUsableByteArrayDataOutputStream; |
| private UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger; |
| |
| private UnsafeCarbonRowPage rowPage; |
| |
| private final Object addRowsLock = new Object(); |
| |
| private long inMemoryChunkSize; |
| |
| private boolean enableInMemoryIntermediateMerge; |
| |
| private int bytesAdded; |
| |
| private long maxSizeAllowed; |
| |
| /** |
| * semaphore which will used for managing sorted data object arrays |
| */ |
| private Semaphore semaphore; |
| |
| private final String taskId; |
| |
| public UnsafeSortDataRows(SortParameters parameters, |
| UnsafeIntermediateMerger unsafeInMemoryIntermediateFileMerger, int inMemoryChunkSize) { |
| this.parameters = parameters; |
| this.tableFieldStat = new TableFieldStat(parameters); |
| this.reUsableByteArrayDataOutputStream = new ThreadLocal<ReUsableByteArrayDataOutputStream>() { |
| @Override |
| protected ReUsableByteArrayDataOutputStream initialValue() { |
| ByteArrayOutputStream byteStream = new ByteArrayOutputStream(); |
| return new ReUsableByteArrayDataOutputStream(byteStream); |
| } |
| }; |
| this.unsafeInMemoryIntermediateFileMerger = unsafeInMemoryIntermediateFileMerger; |
| |
| // observer of writing file in thread |
| this.threadStatusObserver = new ThreadStatusObserver(); |
| this.taskId = ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId(); |
| this.inMemoryChunkSize = inMemoryChunkSize * 1024L * 1024L; |
| enableInMemoryIntermediateMerge = Boolean.parseBoolean(CarbonProperties.getInstance() |
| .getProperty(CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT, |
| CarbonCommonConstants.ENABLE_INMEMORY_MERGE_SORT_DEFAULT)); |
| |
| // Take half the size of usable memory configured in sort memory size. |
| this.maxSizeAllowed = UnsafeMemoryManager.INSTANCE.getUsableMemory() / 2; |
| } |
| |
| /** |
| * This method will be used to initialize |
| */ |
| public void initialize() throws MemoryException, CarbonSortKeyAndGroupByException { |
| this.rowPage = createUnsafeRowPage(); |
| // Delete if any older file exists in sort temp folder |
| deleteSortLocationIfExists(); |
| |
| // create new sort temp directory |
| CarbonDataProcessorUtil.createLocations(parameters.getTempFileLocation()); |
| this.dataSorterAndWriterExecutorService = Executors |
| .newFixedThreadPool(parameters.getNumberOfCores(), |
| new CarbonThreadFactory("UnsafeSortDataRowPool:" + parameters.getTableName(), |
| true)); |
| semaphore = new Semaphore(parameters.getNumberOfCores()); |
| } |
| |
| private UnsafeCarbonRowPage createUnsafeRowPage() |
| throws MemoryException, CarbonSortKeyAndGroupByException { |
| MemoryBlock baseBlock = |
| UnsafeMemoryManager.allocateMemoryWithRetry(this.taskId, inMemoryChunkSize); |
| boolean isMemoryAvailable = |
| UnsafeSortMemoryManager.INSTANCE.isMemoryAvailable(baseBlock.size()); |
| if (!isMemoryAvailable) { |
| unsafeInMemoryIntermediateFileMerger.tryTriggerInMemoryMerging(true); |
| } |
| return new UnsafeCarbonRowPage(tableFieldStat, baseBlock, taskId); |
| } |
| |
| public boolean canAdd() { |
| return bytesAdded < maxSizeAllowed; |
| } |
| |
| /** |
| * This method will be used to add new row |
| * |
| * @param rowBatch new rowBatch |
| * @throws CarbonSortKeyAndGroupByException problem while writing |
| */ |
| public void addRowBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { |
| // if record holder list size is equal to sort buffer size then it will |
| // sort the list and then write current list data to file |
| synchronized (addRowsLock) { |
| addBatch(rowBatch, size); |
| } |
| } |
| |
| /** |
| * This method will be used to add new row |
| * |
| * @param rowBatch new rowBatch |
| * @param size |
| * @throws CarbonSortKeyAndGroupByException problem while writing |
| */ |
| public void addRowBatchWithOutSync(Object[][] rowBatch, int size) |
| throws CarbonSortKeyAndGroupByException { |
| // if record holder list size is equal to sort buffer size then it will |
| // sort the list and then write current list data to file |
| addBatch(rowBatch, size); |
| } |
| |
| private void addBatch(Object[][] rowBatch, int size) throws CarbonSortKeyAndGroupByException { |
| if (rowPage == null) { |
| return; |
| } |
| for (int i = 0; i < size; i++) { |
| try { |
| if (!rowPage.canAdd()) { |
| handlePreviousPage(); |
| try { |
| rowPage = createUnsafeRowPage(); |
| } catch (Exception ex) { |
| // row page has freed in handlePreviousPage(), so other iterator may try to access it. |
| rowPage = null; |
| LOGGER.error("exception occurred while trying to acquire a semaphore lock: " |
| + ex.getMessage(), ex); |
| throw new CarbonSortKeyAndGroupByException(ex); |
| } |
| } |
| bytesAdded += rowPage.addRow(rowBatch[i], reUsableByteArrayDataOutputStream.get()); |
| } catch (Exception e) { |
| if (e.getMessage().contains("cannot handle this row. create new page")) { |
| rowPage.makeCanAddFail(); |
| // so that same rowBatch will be handled again in new page |
| i--; |
| } else { |
| LOGGER.error( |
| "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); |
| throw new CarbonSortKeyAndGroupByException(e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * This method will be used to add new row |
| */ |
| public void addRow(Object[] row) throws CarbonSortKeyAndGroupByException { |
| if (rowPage == null) { |
| return; |
| } |
| try { |
| // if record holder list size is equal to sort buffer size then it will |
| // sort the list and then write current list data to file |
| if (!rowPage.canAdd()) { |
| handlePreviousPage(); |
| try { |
| rowPage = createUnsafeRowPage(); |
| } catch (Exception ex) { |
| rowPage = null; |
| LOGGER.error("exception occurred while trying to acquire a semaphore lock: " |
| + ex.getMessage(), ex); |
| throw new CarbonSortKeyAndGroupByException(ex); |
| } |
| } |
| rowPage.addRow(row, reUsableByteArrayDataOutputStream.get()); |
| } catch (Exception e) { |
| if (e.getMessage().contains("cannot handle this row. create new page")) { |
| rowPage.makeCanAddFail(); |
| addRow(row); |
| } else { |
| LOGGER.error( |
| "exception occurred while trying to acquire a semaphore lock: " + e.getMessage(), e); |
| throw new CarbonSortKeyAndGroupByException(e); |
| } |
| } |
| } |
| |
| /** |
| * Below method will be used to start sorting process. This method will get |
| * all the temp unsafe pages in memory and all the temp files and try to merge them if possible. |
| * Also, it will spill the pages to disk or add it to unsafe sort memory. |
| * |
| * @throws CarbonSortKeyAndGroupByException if error occurs during in-memory merge |
| * @throws InterruptedException if error occurs during data sort and write |
| */ |
| public void startSorting() throws CarbonSortKeyAndGroupByException, InterruptedException { |
| LOGGER.info("Unsafe based sorting will be used"); |
| if (this.rowPage.getUsedSize() > 0) { |
| handlePreviousPage(); |
| } else { |
| rowPage.freeMemory(); |
| } |
| startFileBasedMerge(); |
| } |
| |
| /** |
| * Deal with the previous pages added to sort-memory. Carbondata will merge the in-memory pages |
| * or merge the sort temp files if possible. After that, carbondata will add current page to |
| * sort memory or just spill them. |
| */ |
| private void handlePreviousPage() |
| throws CarbonSortKeyAndGroupByException, InterruptedException { |
| if (enableInMemoryIntermediateMerge) { |
| unsafeInMemoryIntermediateFileMerger.startInmemoryMergingIfPossible(); |
| } |
| unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible(); |
| semaphore.acquire(); |
| dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage)); |
| } |
| |
| /** |
| * write a page to sort temp file |
| * @param rowPage page |
| * @param file file |
| * @throws CarbonSortKeyAndGroupByException |
| */ |
| private void writeDataToFile(UnsafeCarbonRowPage rowPage, File file) |
| throws CarbonSortKeyAndGroupByException { |
| DataOutputStream stream = null; |
| try { |
| // open stream |
| stream = FileFactory.getDataOutputStream(file.getPath(), |
| parameters.getFileWriteBufferSize(), parameters.getSortTempCompressorName()); |
| int actualSize = rowPage.getBuffer().getActualSize(); |
| // write number of entries to the file |
| stream.writeInt(actualSize); |
| for (int i = 0; i < actualSize; i++) { |
| rowPage.writeRow( |
| rowPage.getBuffer().get(i) + rowPage.getDataBlock().getBaseOffset(), stream); |
| } |
| } catch (IOException | MemoryException e) { |
| throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e); |
| } finally { |
| // close streams |
| CarbonUtil.closeStreams(stream); |
| } |
| } |
| |
| /** |
| * This method will be used to delete sort temp location is it is exites |
| */ |
| public void deleteSortLocationIfExists() { |
| CarbonDataProcessorUtil.deleteSortLocationIfExists(parameters.getTempFileLocation()); |
| } |
| |
| /** |
| * Below method will be used to start file based merge |
| * |
| * @throws InterruptedException |
| */ |
| private void startFileBasedMerge() throws InterruptedException { |
| dataSorterAndWriterExecutorService.shutdown(); |
| dataSorterAndWriterExecutorService.awaitTermination(2, TimeUnit.DAYS); |
| } |
| |
| /** |
| * Observer class for thread execution |
| * In case of any failure we need stop all the running thread |
| */ |
| private class ThreadStatusObserver { |
| /** |
| * Below method will be called if any thread fails during execution |
| * |
| * @param exception |
| * @throws CarbonSortKeyAndGroupByException |
| */ |
| public void notifyFailed(Throwable exception) throws CarbonSortKeyAndGroupByException { |
| semaphore.release(); |
| dataSorterAndWriterExecutorService.shutdownNow(); |
| unsafeInMemoryIntermediateFileMerger.close(); |
| parameters.getObserver().setFailed(true); |
| LOGGER.error(exception); |
| throw new CarbonSortKeyAndGroupByException(exception); |
| } |
| } |
| |
| /** |
| * This class is responsible for sorting and writing the object |
| * array which holds the records equal to given array size |
| */ |
| private class DataSorterAndWriter implements Runnable { |
| private UnsafeCarbonRowPage page; |
| |
| public DataSorterAndWriter(UnsafeCarbonRowPage rowPage) { |
| this.page = rowPage; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| long startTime = System.currentTimeMillis(); |
| TimSort<UnsafeCarbonRow, IntPointerBuffer> timSort = new TimSort<>( |
| new UnsafeIntSortDataFormat(page)); |
| // if sort_columns is not none, sort by sort_columns |
| if (parameters.getNumberOfNoDictSortColumns() > 0) { |
| timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(), |
| new UnsafeRowComparator(page)); |
| } else { |
| timSort.sort(page.getBuffer(), 0, page.getBuffer().getActualSize(), |
| new UnsafeRowComparatorForNormalDims(page)); |
| } |
| // get sort storage memory block if memory is available in sort storage manager |
| // if space is available then store it in memory, if memory is not available |
| // then spill to disk |
| MemoryBlock sortStorageMemoryBlock = |
| UnsafeSortMemoryManager.INSTANCE.allocateMemory(taskId, page.getDataBlock().size()); |
| if (null == sortStorageMemoryBlock) { |
| // create a new file every time |
| // create a new file and pick a temp directory randomly every time |
| String tmpDir = parameters.getTempFileLocation()[ |
| new Random().nextInt(parameters.getTempFileLocation().length)]; |
| File sortTempFile = new File(tmpDir + File.separator + parameters.getTableName() |
| + '_' + parameters.getRangeId() + '_' + System.nanoTime() |
| + CarbonCommonConstants.SORT_TEMP_FILE_EXT); |
| writeDataToFile(page, sortTempFile); |
| LOGGER.info("Time taken to sort row page with size" + page.getBuffer().getActualSize() |
| + " and write is: " + (System.currentTimeMillis() - startTime) + ": location:" |
| + sortTempFile + ", sort temp file size in MB is " |
| + sortTempFile.length() * 0.1 * 10 / 1024 / 1024); |
| page.freeMemory(); |
| // add sort temp filename to and arrayList. When the list size reaches 20 then |
| // intermediate merging of sort temp files will be triggered |
| unsafeInMemoryIntermediateFileMerger.addFileToMerge(sortTempFile); |
| } else { |
| // copying data from working memory manager block to storage memory manager block |
| CarbonUnsafe.getUnsafe() |
| .copyMemory(page.getDataBlock().getBaseObject(), page.getDataBlock().getBaseOffset(), |
| sortStorageMemoryBlock.getBaseObject(), |
| sortStorageMemoryBlock.getBaseOffset(), page.getDataBlock().size()); |
| // free unsafememory manager |
| page.freeMemory(); |
| page.setNewDataBlock(sortStorageMemoryBlock); |
| // add sort temp filename to and arrayList. When the list size reaches 20 then |
| // intermediate merging of sort temp files will be triggered |
| page.getBuffer().loadToUnsafe(); |
| unsafeInMemoryIntermediateFileMerger.addDataChunkToMerge(page); |
| LOGGER.info( |
| "Time taken to sort row page with size: " + page.getBuffer().getActualSize() + " is: " |
| + (System.currentTimeMillis() - startTime)); |
| } |
| } catch (Throwable e) { |
| try { |
| threadStatusObserver.notifyFailed(e); |
| } catch (CarbonSortKeyAndGroupByException ex) { |
| LOGGER.error(e.getMessage(), e); |
| } |
| } finally { |
| semaphore.release(); |
| } |
| } |
| } |
| } |
| |