blob: 16220604ec813da4bd83f9ced31d05f05e8a9eb2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.loading.steps;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.datamap.DataMapWriterListener;
import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
import org.apache.carbondata.processing.loading.exception.BadRecordFoundException;
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
import org.apache.carbondata.processing.loading.row.CarbonRowBatch;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.log4j.Logger;
/**
* It reads data from batch of sorted files(it could be in-memory/disk based files)
* which are generated in previous sort step. And it writes data to carbondata file.
* It also generates mdk key while writing to carbondata file
*/
public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorStep {
private static final Logger LOGGER =
LogServiceFactory.getLogService(DataWriterBatchProcessorStepImpl.class.getName());
private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
private CarbonFactHandler carbonFactHandler;
public DataWriterBatchProcessorStepImpl(CarbonDataLoadConfiguration configuration,
AbstractDataLoadProcessorStep child) {
super(configuration, child);
this.localDictionaryGeneratorMap =
CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
}
@Override
public void initialize() throws IOException {
super.initialize();
child.initialize();
}
private String[] getStoreLocation() {
return CarbonDataProcessorUtil
.getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(),
String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false);
}
@Override
public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
Iterator<CarbonRowBatch>[] iterators = child.execute();
CarbonTableIdentifier tableIdentifier =
configuration.getTableIdentifier().getCarbonTableIdentifier();
String tableName = tableIdentifier.getTableName();
try {
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
System.currentTimeMillis());
int i = 0;
String[] storeLocation = getStoreLocation();
CarbonDataProcessorUtil.createLocations(storeLocation);
for (Iterator<CarbonRowBatch> iterator : iterators) {
int k = 0;
while (iterator.hasNext()) {
CarbonRowBatch next = iterator.next();
// If no rows from merge sorter, then don't create a file in fact column handler
if (next.hasNext()) {
DataMapWriterListener listener = getDataMapWriterListener(0);
CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
.createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++, listener);
model.setColumnLocalDictGenMap(this.localDictionaryGeneratorMap);
this.carbonFactHandler = CarbonFactHandlerFactory
.createCarbonFactHandler(model);
carbonFactHandler.initialise();
processBatch(next, carbonFactHandler);
try {
finish(tableName, carbonFactHandler);
} finally {
// we need to make carbonFactHandler =null as finish will call closehandler
// even finish throws exception
// otherwise close() will call finish method again for same handler.
this.carbonFactHandler = null;
}
}
}
i++;
}
} catch (Exception e) {
LOGGER.error("Failed for table: " + tableName + " in DataWriterBatchProcessorStepImpl", e);
if (e.getCause() instanceof BadRecordFoundException) {
throw new BadRecordFoundException(e.getCause().getMessage());
}
throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
}
return null;
}
@Override
protected String getStepName() {
return "Data Batch Writer";
}
private void finish(String tableName, CarbonFactHandler dataHandler) {
CarbonDataWriterException exception = null;
try {
dataHandler.finish();
} catch (Exception e) {
// if throw exception from here dataHandler will not be closed.
// so just holding exception and later throwing exception
LOGGER.error("Failed for table: " + tableName + " in finishing data handler", e);
exception = new CarbonDataWriterException(
"Failed for table: " + tableName + " in finishing data handler", e);
}
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
try {
processingComplete(dataHandler);
} catch (Exception e) {
if (null == exception) {
exception = new CarbonDataWriterException(e);
}
}
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
System.currentTimeMillis());
CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
.recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PARTITION_ID,
System.currentTimeMillis());
if (null != exception) {
throw exception;
}
}
private void processingComplete(CarbonFactHandler dataHandler) {
if (null != dataHandler) {
try {
dataHandler.closeHandler();
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new CarbonDataLoadingException(
"There is an unexpected error while closing data handler", e);
}
}
}
private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) throws Exception {
int batchSize = 0;
while (batch.hasNext()) {
CarbonRow row = batch.next();
dataHandler.addDataToStore(row);
batchSize++;
}
batch.close();
rowCounter.getAndAdd(batchSize);
}
@Override
public void close() {
if (!closed) {
super.close();
if (null != this.carbonFactHandler) {
carbonFactHandler.finish();
carbonFactHandler.closeHandler();
}
}
}
}