blob: 2c1bdea97fbede286bf7ed997a53da73b8e7afb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.metadata.utils;
import static org.apache.asterix.common.utils.IdentifierUtil.dataset;
import java.io.DataOutput;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.asterix.builders.IARecordBuilder;
import org.apache.asterix.builders.RecordBuilder;
import org.apache.asterix.column.util.ColumnSecondaryIndexSchemaUtil;
import org.apache.asterix.common.config.DatasetConfig;
import org.apache.asterix.common.config.DatasetConfig.DatasetType;
import org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.context.ITransactionSubsystemProvider;
import org.apache.asterix.common.context.TransactionSubsystemProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
import org.apache.asterix.metadata.IDatasetDetails;
import org.apache.asterix.metadata.MetadataManager;
import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.CompactionPolicy;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.InternalDatasetDetails;
import org.apache.asterix.metadata.entities.NodeGroup;
import org.apache.asterix.om.base.AMutableString;
import org.apache.asterix.om.base.AString;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.BuiltinType;
import org.apache.asterix.om.types.IAType;
import org.apache.asterix.om.types.visitor.SimpleStringBuilderForIATypeVisitor;
import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorDescriptor;
import org.apache.asterix.runtime.projection.DataProjectionFiltrationInfo;
import org.apache.asterix.runtime.utils.RuntimeUtils;
import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexInstantSearchOperationCallbackFactory;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint;
import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraintHelper;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.algebricks.common.utils.Pair;
import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider;
import org.apache.hyracks.api.dataflow.IOperatorDescriptor;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory;
import org.apache.hyracks.api.dataflow.value.IMissingWriterFactory;
import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.ITypeTraits;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileSplit;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
import org.apache.hyracks.dataflow.common.data.marshalling.IntegerSerializerDeserializer;
import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory;
import org.apache.hyracks.dataflow.std.file.IFileSplitProvider;
import org.apache.hyracks.dataflow.std.misc.ConstantTupleSourceOperatorDescriptor;
import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexCreateOperatorDescriptor;
import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor;
import org.apache.hyracks.storage.am.common.impls.DefaultTupleProjectorFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMTreeIndexCompactOperatorDescriptor;
import org.apache.hyracks.storage.common.IResourceFactory;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
import org.apache.hyracks.util.LogRedactionUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class DatasetUtil {
private static final Logger LOGGER = LogManager.getLogger();
/*
* Dataset related operations
*/
public static final byte OP_UPSERT = 0x03;
private DatasetUtil() {
}
public static Integer getFilterSourceIndicator(Dataset dataset) {
return ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterSourceIndicator();
}
public static List<String> getFilterField(Dataset dataset) {
return ((InternalDatasetDetails) dataset.getDatasetDetails()).getFilterField();
}
public static IBinaryComparatorFactory[] computeFilterBinaryComparatorFactories(Dataset dataset,
ARecordType recordType, ARecordType metaType, IBinaryComparatorFactoryProvider comparatorFactoryProvider)
throws AlgebricksException {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return null;
}
Integer filterFieldSourceIndicator = getFilterSourceIndicator(dataset);
List<String> filterField = getFilterField(dataset);
if (filterField == null) {
return null;
}
IBinaryComparatorFactory[] bcfs = new IBinaryComparatorFactory[1];
ARecordType itemType = filterFieldSourceIndicator == 0 ? recordType : metaType;
IAType type = itemType.getSubFieldType(filterField);
bcfs[0] = comparatorFactoryProvider.getBinaryComparatorFactory(type, true);
return bcfs;
}
public static ITypeTraits[] computeFilterTypeTraits(Dataset dataset, ARecordType recordType, ARecordType metaType)
throws AlgebricksException {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return null;
}
Integer filterFieldSourceIndicator = getFilterSourceIndicator(dataset);
List<String> filterField = getFilterField(dataset);
if (filterField == null) {
return null;
}
ARecordType itemType = filterFieldSourceIndicator == 0 ? recordType : metaType;
ITypeTraits[] typeTraits = new ITypeTraits[1];
IAType type = itemType.getSubFieldType(filterField);
typeTraits[0] = TypeTraitProvider.INSTANCE.getTypeTrait(type);
return typeTraits;
}
public static int[] createFilterFields(Dataset dataset) {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return null;
}
List<String> filterField = getFilterField(dataset);
if (filterField == null) {
return null;
}
List<List<String>> partitioningKeys = dataset.getPrimaryKeys();
int numKeys = partitioningKeys.size();
int[] filterFields = new int[1];
int valueFields = dataset.hasMetaPart() ? 2 : 1;
filterFields[0] = numKeys + valueFields;
return filterFields;
}
public static int[] createBTreeFieldsWhenThereisAFilter(Dataset dataset) {
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return null;
}
List<String> filterField = getFilterField(dataset);
if (filterField == null) {
return null;
}
List<List<String>> partitioningKeys = dataset.getPrimaryKeys();
int valueFields = dataset.hasMetaPart() ? 2 : 1;
int[] btreeFields = new int[partitioningKeys.size() + valueFields];
for (int i = 0; i < btreeFields.length; ++i) {
btreeFields[i] = i;
}
return btreeFields;
}
/**
* Returns the primary key source indicators of the {@code dataset} or {@code null} if the dataset does not have
* primary key source indicators (e.g. external datasets)
*/
public static List<Integer> getKeySourceIndicators(Dataset dataset) {
IDatasetDetails datasetDetails = dataset.getDatasetDetails();
if (datasetDetails.getDatasetType() == DatasetType.INTERNAL) {
return ((InternalDatasetDetails) datasetDetails).getKeySourceIndicator();
}
return null;
}
public static int getPositionOfPartitioningKeyField(Dataset dataset, List<String> fieldExpr,
boolean fieldFromMeta) {
List<Integer> keySourceIndicator = null;
IDatasetDetails datasetDetails = dataset.getDatasetDetails();
if (datasetDetails.getDatasetType() == DatasetType.INTERNAL) {
keySourceIndicator = ((InternalDatasetDetails) datasetDetails).getKeySourceIndicator();
}
List<List<String>> partitioningKeys = dataset.getPrimaryKeys();
for (int i = 0; i < partitioningKeys.size(); i++) {
List<String> partitioningKey = partitioningKeys.get(i);
if (partitioningKey.equals(fieldExpr) && keySourceMatches(keySourceIndicator, i, fieldFromMeta)) {
return i;
}
}
return -1;
}
/**
* Once it's determined that a field name is a key (by just comparing the names), this method checks whether the
* field is actually a key by making sure the field is coming from the right record (data record or meta record),
* e.g. if the field name happens to be equal to the key name but the field is coming from the data record while
* the key is coming from the meta record.
*
* @param keySourceIndicator indicates where the key is coming from, 1 from meta record, 0 from data record
* @param keyIndex the key index we're checking the field against
* @param fieldFromMeta whether the field is coming from the meta record or the data record
* @return true if the key source matches the field source. Otherwise, false.
*/
private static boolean keySourceMatches(List<Integer> keySourceIndicator, int keyIndex, boolean fieldFromMeta) {
if (keySourceIndicator != null) {
return (fieldFromMeta && keySourceIndicator.get(keyIndex) == 1)
|| (!fieldFromMeta && keySourceIndicator.get(keyIndex) == 0);
}
return true;
}
public static Pair<ILSMMergePolicyFactory, Map<String, String>> getMergePolicyFactory(Dataset dataset,
MetadataTransactionContext mdTxnCtx) throws AlgebricksException {
String policyName = dataset.getCompactionPolicy();
CompactionPolicy compactionPolicy = MetadataManager.INSTANCE.getCompactionPolicy(mdTxnCtx,
MetadataConstants.METADATA_DATAVERSE_NAME, policyName);
String compactionPolicyFactoryClassName = compactionPolicy.getClassName();
ILSMMergePolicyFactory mergePolicyFactory;
Map<String, String> properties = dataset.getCompactionPolicyProperties();
try {
mergePolicyFactory = (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance();
if (mergePolicyFactory.getName().compareTo(CorrelatedPrefixMergePolicyFactory.NAME) == 0) {
properties.put(CorrelatedPrefixMergePolicyFactory.KEY_DATASET_ID,
Integer.toString(dataset.getDatasetId()));
}
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
throw new AlgebricksException(e);
}
return new Pair<>(mergePolicyFactory, properties);
}
public static void writePropertyTypeRecord(String name, String value, DataOutput out, ARecordType recordType)
throws HyracksDataException {
IARecordBuilder propertyRecordBuilder = new RecordBuilder();
ArrayBackedValueStorage fieldValue = new ArrayBackedValueStorage();
propertyRecordBuilder.reset(recordType);
AMutableString aString = new AMutableString("");
ISerializerDeserializer<AString> stringSerde =
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ASTRING);
// write field 0
fieldValue.reset();
aString.setValue(name);
stringSerde.serialize(aString, fieldValue.getDataOutput());
propertyRecordBuilder.addField(0, fieldValue);
// write field 1
fieldValue.reset();
aString.setValue(value);
stringSerde.serialize(aString, fieldValue.getDataOutput());
propertyRecordBuilder.addField(1, fieldValue);
propertyRecordBuilder.write(out, true);
}
public static ARecordType getMetaType(MetadataProvider metadataProvider, Dataset dataset)
throws AlgebricksException {
if (dataset.hasMetaPart()) {
return (ARecordType) metadataProvider.findType(dataset.getMetaItemTypeDataverseName(),
dataset.getMetaItemTypeName());
}
return null;
}
public static JobSpecification dropDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider)
throws AlgebricksException, ACIDException {
return dropDatasetJobSpec(dataset, metadataProvider, Collections.emptySet());
}
public static JobSpecification dropDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider,
Set<IndexDropOperatorDescriptor.DropOption> options) throws AlgebricksException, ACIDException {
LOGGER.info("DROP DATASET: " + dataset);
if (dataset.getDatasetType() == DatasetType.EXTERNAL) {
return RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
}
JobSpecification specPrimary = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset);
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
IndexDropOperatorDescriptor primaryBtreeDrop =
new IndexDropOperatorDescriptor(specPrimary, indexHelperFactory, options);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(specPrimary, primaryBtreeDrop,
splitsAndConstraint.second);
specPrimary.addRoot(primaryBtreeDrop);
return specPrimary;
}
public static JobSpecification createDatasetJobSpec(Dataset dataset, MetadataProvider metadataProvider)
throws AlgebricksException {
Index index = IndexUtil.getPrimaryIndex(dataset);
ARecordType itemType = (ARecordType) metadataProvider.findType(dataset);
// get meta item type
ARecordType metaItemType = null;
if (dataset.hasMetaPart()) {
metaItemType = (ARecordType) metadataProvider.findMetaType(dataset);
}
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset);
FileSplit[] fs = splitsAndConstraint.first.getFileSplits();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < fs.length; i++) {
sb.append(fs[i] + " ");
}
LOGGER.info("CREATING File Splits: " + sb);
Pair<ILSMMergePolicyFactory, Map<String, String>> compactionInfo =
DatasetUtil.getMergePolicyFactory(dataset, metadataProvider.getMetadataTxnContext());
// prepare a LocalResourceMetadata which will be stored in NC's local resource
// repository
IResourceFactory resourceFactory = dataset.getResourceFactory(metadataProvider, index, itemType, metaItemType,
compactionInfo.first, compactionInfo.second);
IndexBuilderFactory indexBuilderFactory =
new IndexBuilderFactory(metadataProvider.getStorageComponentProvider().getStorageManager(),
splitsAndConstraint.first, resourceFactory, true);
IndexCreateOperatorDescriptor indexCreateOp = new IndexCreateOperatorDescriptor(spec, indexBuilderFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, indexCreateOp,
splitsAndConstraint.second);
spec.addRoot(indexCreateOp);
return spec;
}
public static JobSpecification compactDatasetJobSpec(Dataverse dataverse, String datasetName,
MetadataProvider metadataProvider) throws AlgebricksException {
DataverseName dataverseName = dataverse.getDataverseName();
Dataset dataset = metadataProvider.findDataset(dataverseName, datasetName);
if (dataset == null) {
throw new AsterixException(ErrorCode.UNKNOWN_DATASET_IN_DATAVERSE, datasetName, dataverseName);
}
JobSpecification spec = RuntimeUtils.createJobSpecification(metadataProvider.getApplicationContext());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset);
IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), splitsAndConstraint.first);
LSMTreeIndexCompactOperatorDescriptor compactOp =
new LSMTreeIndexCompactOperatorDescriptor(spec, indexHelperFactory);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
splitsAndConstraint.second);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, compactOp,
splitsAndConstraint.second);
spec.addRoot(compactOp);
return spec;
}
/**
* Creates a primary index scan operator for a given dataset.
*
* @param spec, the job specification.
* @param metadataProvider, the metadata provider.
* @param dataset, the dataset to scan.
* @return a primary index scan operator.
* @throws AlgebricksException
*/
public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
Dataset dataset) throws AlgebricksException {
return createPrimaryIndexScanOp(spec, metadataProvider, dataset, DefaultTupleProjectorFactory.INSTANCE);
}
public static IOperatorDescriptor createPrimaryIndexScanOp(JobSpecification spec, MetadataProvider metadataProvider,
Dataset dataset, ITupleProjectorFactory projectorFactory) throws AlgebricksException {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset);
IFileSplitProvider primaryFileSplitProvider = primarySplitsAndConstraint.first;
AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
// -Infinity
int[] lowKeyFields = null;
// +Infinity
int[] highKeyFields = null;
ITransactionSubsystemProvider txnSubsystemProvider = TransactionSubsystemProvider.INSTANCE;
ISearchOperationCallbackFactory searchCallbackFactory = new PrimaryIndexInstantSearchOperationCallbackFactory(
dataset.getDatasetId(), dataset.getPrimaryBloomFilterFields(), txnSubsystemProvider,
IRecoveryManager.ResourceType.LSM_BTREE);
IndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory(
metadataProvider.getStorageComponentProvider().getStorageManager(), primaryFileSplitProvider);
BTreeSearchOperatorDescriptor primarySearchOp = new BTreeSearchOperatorDescriptor(spec,
dataset.getPrimaryRecordDescriptor(metadataProvider), lowKeyFields, highKeyFields, true, true,
indexHelperFactory, false, false, null, searchCallbackFactory, null, null, false, null, null, -1, false,
null, null, projectorFactory, null, null);
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, primarySearchOp,
primaryPartitionConstraint);
return primarySearchOp;
}
/**
* Creates a primary index upsert operator for a given dataset.
*
* @param spec, the job specification.
* @param metadataProvider, the metadata provider.
* @param dataset, the dataset to upsert.
* @param inputRecordDesc,the record descriptor for an input tuple.
* @param fieldPermutation, the field permutation according to the input.
* @param missingWriterFactory, the factory for customizing missing value serialization.
* @return a primary index scan operator and its location constraints.
* @throws AlgebricksException
*/
public static Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> createPrimaryIndexUpsertOp(
JobSpecification spec, MetadataProvider metadataProvider, Dataset dataset, RecordDescriptor inputRecordDesc,
int[] fieldPermutation, IMissingWriterFactory missingWriterFactory) throws AlgebricksException {
int numKeys = dataset.getPrimaryKeys().size();
int numFilterFields = DatasetUtil.getFilterField(dataset) == null ? 0 : 1;
ARecordType itemType = (ARecordType) metadataProvider.findType(dataset);
ARecordType metaItemType = (ARecordType) metadataProvider.findMetaType(dataset);
Index primaryIndex = metadataProvider.getIndex(dataset.getDataverseName(), dataset.getDatasetName(),
dataset.getDatasetName());
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> splitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset);
// prepare callback
int[] primaryKeyFields = new int[numKeys];
int[] pkFields = new int[numKeys];
for (int i = 0; i < numKeys; i++) {
primaryKeyFields[i] = i;
pkFields[i] = fieldPermutation[i];
}
boolean hasSecondaries =
metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName()).size() > 1;
IStorageComponentProvider storageComponentProvider = metadataProvider.getStorageComponentProvider();
IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory(
storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory(
storageComponentProvider, primaryIndex, IndexOperation.UPSERT, primaryKeyFields);
IIndexDataflowHelperFactory idfh =
new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), splitsAndConstraint.first);
LSMPrimaryUpsertOperatorDescriptor op;
ITypeTraits[] outputTypeTraits = new ITypeTraits[inputRecordDesc.getFieldCount() + 1
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[inputRecordDesc.getFieldCount() + 1
+ (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
IDataFormat dataFormat = metadataProvider.getDataFormat();
int f = 0;
// add the upsert operation var
outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(BuiltinType.AINT8);
outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(BuiltinType.AINT8);
f++;
// add the previous record
outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(itemType);
outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(itemType);
f++;
// add the previous meta second
if (dataset.hasMetaPart()) {
outputSerDes[f] = dataFormat.getSerdeProvider().getSerializerDeserializer(metaItemType);
outputTypeTraits[f] = dataFormat.getTypeTraitProvider().getTypeTrait(metaItemType);
f++;
}
// add the previous filter third
int fieldIdx = -1;
Integer filterSourceIndicator = null;
ARecordType filterItemType = null;
if (numFilterFields > 0) {
filterSourceIndicator = DatasetUtil.getFilterSourceIndicator(dataset);
String filterField = DatasetUtil.getFilterField(dataset).get(0);
filterItemType = filterSourceIndicator == 0 ? itemType : metaItemType;
String[] fieldNames = filterItemType.getFieldNames();
int i = 0;
for (; i < fieldNames.length; i++) {
if (fieldNames[i].equals(filterField)) {
break;
}
}
fieldIdx = i;
outputTypeTraits[f] =
dataFormat.getTypeTraitProvider().getTypeTrait(filterItemType.getFieldTypes()[fieldIdx]);
outputSerDes[f] =
dataFormat.getSerdeProvider().getSerializerDeserializer(filterItemType.getFieldTypes()[fieldIdx]);
f++;
}
for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
outputSerDes[j + f] = inputRecordDesc.getFields()[j];
}
RecordDescriptor outputRecordDesc = new RecordDescriptor(outputSerDes, outputTypeTraits);
// This allows to project only the indexed fields instead of the entirety of the record
ARecordType requestedType = getPrevRecordType(metadataProvider, dataset, itemType);
ITupleProjectorFactory projectorFactory = IndexUtil.createUpsertTupleProjectorFactory(
dataset.getDatasetFormatInfo(), requestedType, itemType, metaItemType, numKeys);
int numPartitions = MetadataProvider.getNumPartitions(splitsAndConstraint.second);
int[][] partitionsMap = MetadataProvider.getPartitionsMap(numPartitions);
IBinaryHashFunctionFactory[] pkHashFunFactories = dataset.getPrimaryHashFunctionFactories(metadataProvider);
ITuplePartitionerFactory tuplePartitionerFactory =
new FieldHashPartitionerFactory(pkFields, pkHashFunFactories, numPartitions);
op = new LSMPrimaryUpsertOperatorDescriptor(spec, outputRecordDesc, fieldPermutation, idfh,
missingWriterFactory, modificationCallbackFactory, searchCallbackFactory,
dataset.getFrameOpCallbackFactory(metadataProvider), numKeys, filterSourceIndicator, filterItemType,
fieldIdx, hasSecondaries, projectorFactory, tuplePartitionerFactory, partitionsMap);
return new Pair<>(op, splitsAndConstraint.second);
}
/**
* Returns a type that contains indexed fields for columnar datasets.
* The type is used retrieve the previous record with only the indexed fields -- minimizing the
* I/O cost for point lookups.
*
* @param metadataProvider metadata provider
* @param dataset the dataset to upsert to
* @param itemType dataset type
* @return a type with the requested fields
*/
private static ARecordType getPrevRecordType(MetadataProvider metadataProvider, Dataset dataset,
ARecordType itemType) throws AlgebricksException {
if (dataset.getDatasetFormatInfo().getFormat() == DatasetConfig.DatasetFormat.ROW) {
return itemType;
}
// Column
List<Index> secondaryIndexes =
metadataProvider.getDatasetIndexes(dataset.getDataverseName(), dataset.getDatasetName());
List<ARecordType> indexPaths = new ArrayList<>();
for (Index index : secondaryIndexes) {
if (!index.isSecondaryIndex() || index.isPrimaryKeyIndex() || index.isSampleIndex()) {
continue;
}
if (index.getIndexType() == DatasetConfig.IndexType.BTREE) {
Index.ValueIndexDetails indexDetails = (Index.ValueIndexDetails) index.getIndexDetails();
indexPaths.add(indexDetails.getIndexExpectedType());
} else if (index.getIndexType() == DatasetConfig.IndexType.ARRAY) {
Index.ArrayIndexDetails indexDetails = (Index.ArrayIndexDetails) index.getIndexDetails();
indexPaths.add(indexDetails.getIndexExpectedType());
}
}
ARecordType result = indexPaths.isEmpty() ? DataProjectionFiltrationInfo.EMPTY_TYPE
: ColumnSecondaryIndexSchemaUtil.merge(indexPaths);
if (LOGGER.isInfoEnabled() && result != DataProjectionFiltrationInfo.EMPTY_TYPE) {
SimpleStringBuilderForIATypeVisitor schemaPrinter = new SimpleStringBuilderForIATypeVisitor();
StringBuilder builder = new StringBuilder();
result.accept(schemaPrinter, builder);
LOGGER.info("Upsert previous tuple schema: {}", LogRedactionUtil.userData(builder.toString()));
}
return result;
}
/**
* Creates a dummy key provider operator for the primary index scan.
*
* @param spec, the job specification.
* @param dataset, the dataset to scan.
* @param metadataProvider, the metadata provider.
* @return a dummy key provider operator.
* @throws AlgebricksException
*/
public static IOperatorDescriptor createDummyKeyProviderOp(JobSpecification spec, Dataset dataset,
MetadataProvider metadataProvider) throws AlgebricksException {
Pair<IFileSplitProvider, AlgebricksPartitionConstraint> primarySplitsAndConstraint =
metadataProvider.getSplitProviderAndConstraints(dataset);
AlgebricksPartitionConstraint primaryPartitionConstraint = primarySplitsAndConstraint.second;
// Build dummy tuple containing one field with a dummy value inside.
ArrayTupleBuilder tb = new ArrayTupleBuilder(1);
DataOutput dos = tb.getDataOutput();
tb.reset();
try {
// Serialize dummy value into a field.
IntegerSerializerDeserializer.INSTANCE.serialize(0, dos);
} catch (HyracksDataException e) {
throw new AsterixException(e);
}
// Add dummy field.
tb.addFieldEndOffset();
ISerializerDeserializer[] keyRecDescSers = { IntegerSerializerDeserializer.INSTANCE };
RecordDescriptor keyRecDesc = new RecordDescriptor(keyRecDescSers);
ConstantTupleSourceOperatorDescriptor keyProviderOp = new ConstantTupleSourceOperatorDescriptor(spec,
keyRecDesc, tb.getFieldEndOffsets(), tb.getByteArray(), tb.getSize());
AlgebricksPartitionConstraintHelper.setPartitionConstraintInJobSpec(spec, keyProviderOp,
primaryPartitionConstraint);
return keyProviderOp;
}
public static String getFullyQualifiedDisplayName(Dataset dataset) {
return getFullyQualifiedDisplayName(dataset.getDataverseName(), dataset.getDatasetName());
}
public static String getFullyQualifiedDisplayName(DataverseName dataverseName, String datasetName) {
return MetadataUtil.getFullyQualifiedDisplayName(dataverseName, datasetName);
}
/***
* Creates a node group that is associated with a new dataset.
*
* @param dataverseName,
* the dataverse name of the dataset.
* @param datasetName,
* the name of the dataset.
* @param ncNames,
* the set of node names.
* @param metadataProvider,
* the metadata provider.
* @return the name of the created node group.
* @throws Exception
*/
public static String createNodeGroupForNewDataset(DataverseName dataverseName, String datasetName,
Set<String> ncNames, MetadataProvider metadataProvider) throws Exception {
return createNodeGroupForNewDataset(dataverseName, datasetName, 0L, ncNames, metadataProvider);
}
/***
* Creates a node group that is associated with a new dataset.
*
* @param dataverseName,
* the dataverse name of the dataset.
* @param datasetName,
* the name of the dataset.
* @param rebalanceCount
* , the rebalance count of the dataset.
* @param ncNames,
* the set of node names.
* @param metadataProvider,
* the metadata provider.
* @return the name of the created node group.
* @throws Exception
*/
public static String createNodeGroupForNewDataset(DataverseName dataverseName, String datasetName,
long rebalanceCount, Set<String> ncNames, MetadataProvider metadataProvider) throws Exception {
ICcApplicationContext appCtx = metadataProvider.getApplicationContext();
String nodeGroup = dataverseName.getCanonicalForm() + "." + datasetName
+ (rebalanceCount == 0L ? "" : "_" + rebalanceCount);
MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext();
appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
NodeGroup ng = MetadataManager.INSTANCE.getNodegroup(mdTxnCtx, nodeGroup);
if (ng != null) {
nodeGroup = nodeGroup + "_" + UUID.randomUUID();
appCtx.getMetadataLockManager().acquireNodeGroupWriteLock(metadataProvider.getLocks(), nodeGroup);
}
MetadataManager.INSTANCE.addNodegroup(mdTxnCtx, NodeGroup.createOrdered(nodeGroup, new ArrayList<>(ncNames)));
return nodeGroup;
}
public static String getDatasetTypeDisplayName(DatasetType datasetType) {
return datasetType == DatasetType.VIEW ? "view" : dataset();
}
public static boolean isNotView(Dataset dataset) {
return dataset.getDatasetType() != DatasetType.VIEW;
}
}