blob: c044257b5854425a2ee43e28541ef4ca54a24c40 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.carbondata.processing.merger;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.block.SegmentProperties;
import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
import org.apache.carbondata.core.indexstore.PartitionSpec;
import org.apache.carbondata.core.metadata.SegmentFileStore;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
import org.apache.carbondata.core.util.ByteUtil;
import org.apache.carbondata.processing.exception.SliceMergerException;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.loading.sort.CarbonPriorityQueue;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
import org.apache.carbondata.processing.store.CarbonFactHandler;
import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
import org.apache.log4j.Logger;
/**
* This is the Merger class responsible for the merging of the segments.
*/
public class RowResultMergerProcessor extends AbstractResultProcessor {
private CarbonFactHandler dataHandler;
private SegmentProperties segprop;
private CarbonLoadModel loadModel;
private PartitionSpec partitionSpec;
CarbonColumn[] noDicAndComplexColumns;
/**
* record holder heap
*/
private CarbonPriorityQueue<RawResultIterator> recordHolderHeap;
private static final Logger LOGGER =
LogServiceFactory.getLogService(RowResultMergerProcessor.class.getName());
public RowResultMergerProcessor(String databaseName,
String tableName, SegmentProperties segProp, String[] tempStoreLocation,
CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec)
throws IOException {
this.segprop = segProp;
this.partitionSpec = partitionSpec;
this.loadModel = loadModel;
CarbonDataProcessorUtil.createLocations(tempStoreLocation);
String carbonStoreLocation;
if (partitionSpec != null) {
carbonStoreLocation =
partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
.getFactTimeStamp() + ".tmp";
} else {
carbonStoreLocation = CarbonDataProcessorUtil
.createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(),
loadModel.getSegmentId());
}
CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
.getCarbonFactDataHandlerModel(loadModel,
loadModel.getCarbonDataLoadSchema().getCarbonTable(), segProp, tableName,
tempStoreLocation, carbonStoreLocation);
setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
carbonFactDataHandlerModel.setCompactionFlow(true);
carbonFactDataHandlerModel.setSegmentId(loadModel.getSegmentId());
carbonFactDataHandlerModel.setBucketId(loadModel.getBucketId());
this.noDicAndComplexColumns = carbonFactDataHandlerModel.getNoDictAndComplexColumns();
dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
}
private void initRecordHolderHeap(List<RawResultIterator> rawResultIteratorList) {
// create the List of RawResultIterator.
recordHolderHeap = new CarbonPriorityQueue<>(rawResultIteratorList.size(),
new RowResultMergerProcessor.CarbonMdkeyComparator());
}
/**
* Merge function
*
*/
public boolean execute(List<RawResultIterator> unsortedResultIteratorList,
List<RawResultIterator> sortedResultIteratorList) throws Exception {
List<RawResultIterator> finalIteratorList = new ArrayList<>(unsortedResultIteratorList);
finalIteratorList.addAll(sortedResultIteratorList);
initRecordHolderHeap(finalIteratorList);
boolean mergeStatus = false;
int index = 0;
boolean isDataPresent = false;
try {
// add all iterators to the queue
for (RawResultIterator leaftTupleIterator : finalIteratorList) {
if (leaftTupleIterator.hasNext()) {
this.recordHolderHeap.add(leaftTupleIterator);
index++;
}
}
RawResultIterator iterator = null;
while (index > 1) {
// iterator the top record
iterator = this.recordHolderHeap.peek();
Object[] convertedRow = iterator.next();
if (null == convertedRow) {
index--;
iterator.close();
this.recordHolderHeap.poll();
continue;
}
if (!isDataPresent) {
dataHandler.initialise();
isDataPresent = true;
}
// get the mdkey
addRow(convertedRow);
// if there is no record in the leaf and all then decrement the
// index
if (!iterator.hasNext()) {
index--;
iterator.close();
this.recordHolderHeap.poll();
continue;
}
// maintain heap
this.recordHolderHeap.siftTopDown();
}
// if record holder is not empty then iterator the slice holder from
// heap
iterator = this.recordHolderHeap.poll();
if (null != iterator) {
while (true) {
Object[] convertedRow = iterator.next();
if (null == convertedRow) {
iterator.close();
break;
}
// do it only once
if (!isDataPresent) {
dataHandler.initialise();
isDataPresent = true;
}
addRow(convertedRow);
// check if leaf contains no record
if (!iterator.hasNext()) {
break;
}
}
}
if (isDataPresent) {
this.dataHandler.finish();
}
mergeStatus = true;
} catch (Exception e) {
mergeStatus = false;
LOGGER.error(e.getLocalizedMessage(), e);
throw e;
} finally {
try {
if (isDataPresent) {
this.dataHandler.closeHandler();
}
if (partitionSpec != null) {
SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getTaskNo(),
partitionSpec.getLocation().toString(), loadModel.getFactTimeStamp() + "",
partitionSpec.getPartitions());
}
} catch (CarbonDataWriterException | IOException e) {
mergeStatus = false;
throw e;
}
}
return mergeStatus;
}
@Override
public void close() {
// close data handler
if (null != dataHandler) {
dataHandler.closeHandler();
}
}
/**
* Below method will be used to add sorted row
*
* @throws SliceMergerException
*/
private void addRow(Object[] carbonTuple) throws SliceMergerException {
CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segprop, noDicAndComplexColumns);
try {
this.dataHandler.addDataToStore(row);
} catch (CarbonDataWriterException e) {
throw new SliceMergerException("Problem in merging the slice", e);
}
}
/**
* Comparator class for comparing 2 raw row result.
*/
private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
int[] columnValueSizes = segprop.createDimColumnValueLength();
public CarbonMdkeyComparator() {
initSortColumns();
}
private void initSortColumns() {
int numberOfSortColumns = segprop.getNumberOfSortColumns();
if (numberOfSortColumns != columnValueSizes.length) {
int[] sortColumnValueSizes = new int[numberOfSortColumns];
System.arraycopy(columnValueSizes, 0, sortColumnValueSizes, 0, numberOfSortColumns);
this.columnValueSizes = sortColumnValueSizes;
}
}
@Override
public int compare(RawResultIterator o1, RawResultIterator o2) {
Object[] row1 = o1.fetchConverted();
Object[] row2 = o2.fetchConverted();
if (null == row1 || null == row2) {
return 0;
}
ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
int compareResult = 0;
int dictionaryKeyOffset = 0;
byte[] dimCols1 = key1.getDictionaryKey();
byte[] dimCols2 = key2.getDictionaryKey();
int noDicIndex = 0;
for (int eachColumnValueSize : columnValueSizes) {
// case of dictionary cols
if (eachColumnValueSize > 0) {
compareResult = ByteUtil.UnsafeComparer.INSTANCE
.compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
dictionaryKeyOffset, eachColumnValueSize);
dictionaryKeyOffset += eachColumnValueSize;
} else { // case of no dictionary
byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
compareResult =
ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
noDicIndex++;
}
if (0 != compareResult) {
return compareResult;
}
}
return 0;
}
}
}