| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.carbondata.processing.loading.steps; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.carbondata.common.logging.LogServiceFactory; |
| import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; |
| import org.apache.carbondata.core.datastore.row.CarbonRow; |
| import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; |
| import org.apache.carbondata.core.metadata.CarbonTableIdentifier; |
| import org.apache.carbondata.core.util.CarbonThreadFactory; |
| import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; |
| import org.apache.carbondata.core.util.CarbonUtil; |
| import org.apache.carbondata.core.util.path.CarbonTablePath; |
| import org.apache.carbondata.processing.index.IndexWriterListener; |
| import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; |
| import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; |
| import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException; |
| import org.apache.carbondata.processing.loading.row.CarbonRowBatch; |
| import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; |
| import org.apache.carbondata.processing.store.CarbonFactHandler; |
| import org.apache.carbondata.processing.store.CarbonFactHandlerFactory; |
| import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; |
| |
| import org.apache.log4j.Logger; |
| |
| /** |
| * It reads data from sorted files which are generated in previous sort step. |
| * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file |
| */ |
| public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { |
| |
| private static final Logger LOGGER = |
| LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName()); |
| |
| private long readCounter; |
| |
| private IndexWriterListener listener; |
| |
| private final Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap; |
| |
| private ExecutorService rangeExecutorService; |
| |
| private List<CarbonFactHandler> carbonFactHandlers; |
| |
| public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration, |
| AbstractDataLoadProcessorStep child) { |
| super(configuration, child); |
| this.localDictionaryGeneratorMap = |
| CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable()); |
| } |
| |
| public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) { |
| super(configuration, null); |
| this.localDictionaryGeneratorMap = |
| CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable()); |
| } |
| |
| @Override |
| public void initialize() throws IOException { |
| super.initialize(); |
| child.initialize(); |
| this.carbonFactHandlers = new CopyOnWriteArrayList<>(); |
| } |
| |
| private String[] getStoreLocation() { |
| String[] storeLocation = CarbonDataProcessorUtil |
| .getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(), |
| String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false); |
| CarbonDataProcessorUtil.createLocations(storeLocation); |
| return storeLocation; |
| } |
| |
| public CarbonFactDataHandlerModel getDataHandlerModel() { |
| String[] storeLocation = getStoreLocation(); |
| listener = getIndexWriterListener(0); |
| CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel |
| .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener); |
| carbonFactDataHandlerModel.setColumnLocalDictGenMap(localDictionaryGeneratorMap); |
| return carbonFactDataHandlerModel; |
| } |
| |
| @Override |
| public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { |
| Iterator<CarbonRowBatch>[] iterators = child.execute(); |
| CarbonTableIdentifier tableIdentifier = |
| configuration.getTableIdentifier().getCarbonTableIdentifier(); |
| String tableName = tableIdentifier.getTableName(); |
| try { |
| CarbonTimeStatisticsFactory.getLoadStatisticsInstance() |
| .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID, |
| System.currentTimeMillis()); |
| rangeExecutorService = Executors.newFixedThreadPool(iterators.length, |
| new CarbonThreadFactory("WriterForwardPool: " + tableName, true)); |
| List<Future<Void>> rangeExecutorServiceSubmitList = new ArrayList<>(iterators.length); |
| int i = 0; |
| // do this concurrently |
| for (Iterator<CarbonRowBatch> iterator : iterators) { |
| rangeExecutorServiceSubmitList.add( |
| rangeExecutorService.submit(new WriterForwarder(iterator, i))); |
| i++; |
| } |
| try { |
| rangeExecutorService.shutdown(); |
| rangeExecutorService.awaitTermination(2, TimeUnit.DAYS); |
| for (int j = 0; j < rangeExecutorServiceSubmitList.size(); j++) { |
| rangeExecutorServiceSubmitList.get(j).get(); |
| } |
| } catch (InterruptedException e) { |
| throw new CarbonDataWriterException(e); |
| } catch (ExecutionException e) { |
| throw new CarbonDataWriterException(e.getCause()); |
| } |
| } catch (CarbonDataWriterException e) { |
| LOGGER.error(e); |
| throw new CarbonDataLoadingException("Error while initializing writer: " + e.getMessage(), e); |
| } catch (Exception e) { |
| throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e); |
| } |
| return null; |
| } |
| |
| @Override |
| protected String getStepName() { |
| return "Data Writer"; |
| } |
| |
| /** |
| * Used to forward rows to different ranges based on range id. |
| */ |
| private final class WriterForwarder implements Callable<Void> { |
| private Iterator<CarbonRowBatch> insideRangeIterator; |
| private int rangeId; |
| |
| WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) { |
| this.insideRangeIterator = insideRangeIterator; |
| this.rangeId = rangeId; |
| } |
| |
| @Override |
| public Void call() { |
| processRange(insideRangeIterator, rangeId); |
| return null; |
| } |
| } |
| |
| private void processRange(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) { |
| String[] storeLocation = getStoreLocation(); |
| |
| listener = getIndexWriterListener(rangeId); |
| CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel |
| .createCarbonFactDataHandlerModel(configuration, storeLocation, rangeId, 0, listener); |
| model.setColumnLocalDictGenMap(localDictionaryGeneratorMap); |
| CarbonFactHandler dataHandler = null; |
| boolean rowsNotExist = true; |
| while (insideRangeIterator.hasNext()) { |
| if (rowsNotExist) { |
| rowsNotExist = false; |
| dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model); |
| carbonFactHandlers.add(dataHandler); |
| dataHandler.initialise(); |
| } |
| processBatch(insideRangeIterator.next(), dataHandler); |
| } |
| if (!rowsNotExist) { |
| finish(dataHandler); |
| } |
| carbonFactHandlers.remove(dataHandler); |
| } |
| |
| public void finish(CarbonFactHandler dataHandler) { |
| CarbonTableIdentifier tableIdentifier = |
| configuration.getTableIdentifier().getCarbonTableIdentifier(); |
| String tableName = tableIdentifier.getTableName(); |
| dataHandler.finish(); |
| if (LOGGER.isDebugEnabled()) { |
| LOGGER.debug("Record Processed For table: " + tableName); |
| LOGGER.debug("Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + |
| ": Write: " + rowCounter.get()); |
| } |
| CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); |
| processingComplete(dataHandler); |
| CarbonTimeStatisticsFactory.getLoadStatisticsInstance() |
| .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID, |
| System.currentTimeMillis()); |
| CarbonTimeStatisticsFactory.getLoadStatisticsInstance() |
| .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PARTITION_ID, |
| System.currentTimeMillis()); |
| } |
| |
| private void processingComplete(CarbonFactHandler dataHandler) { |
| if (null != dataHandler) { |
| dataHandler.closeHandler(); |
| } |
| } |
| |
| private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) { |
| while (batch.hasNext()) { |
| CarbonRow row = batch.next(); |
| dataHandler.addDataToStore(row); |
| readCounter++; |
| } |
| rowCounter.getAndAdd(batch.getSize()); |
| } |
| |
| public void processRow(CarbonRow row, CarbonFactHandler dataHandler) { |
| readCounter++; |
| dataHandler.addDataToStore(row); |
| rowCounter.getAndAdd(1); |
| } |
| |
| @Override |
| public void close() { |
| if (!closed) { |
| super.close(); |
| if (listener != null) { |
| try { |
| LOGGER.debug("closing all the Index writers registered to index writer listener"); |
| listener.finish(); |
| } catch (IOException e) { |
| LOGGER.error("error while closing the index writers", e); |
| // ignoring the exception |
| } |
| } |
| if (null != rangeExecutorService) { |
| rangeExecutorService.shutdownNow(); |
| } |
| if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) { |
| for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) { |
| carbonFactHandler.finish(); |
| carbonFactHandler.closeHandler(); |
| } |
| } |
| if (configuration.getMetrics() != null) { |
| configuration.getMetrics().addOutputRows(rowCounter.get()); |
| } |
| } |
| } |
| } |