| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.asterix.app.bootstrap; |
| |
| import java.io.File; |
| import java.rmi.RemoteException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.asterix.app.cc.CcApplicationContext; |
| import org.apache.asterix.app.nc.NCAppRuntimeContext; |
| import org.apache.asterix.app.nc.TransactionSubsystem; |
| import org.apache.asterix.common.config.TransactionProperties; |
| import org.apache.asterix.common.context.DatasetLifecycleManager; |
| import org.apache.asterix.common.context.IStorageComponentProvider; |
| import org.apache.asterix.common.context.TransactionSubsystemProvider; |
| import org.apache.asterix.common.dataflow.ICcApplicationContext; |
| import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable; |
| import org.apache.asterix.common.exceptions.ACIDException; |
| import org.apache.asterix.common.metadata.MetadataUtil; |
| import org.apache.asterix.common.metadata.Namespace; |
| import org.apache.asterix.common.metadata.NamespacePathResolver; |
| import org.apache.asterix.common.transactions.ITransactionManager; |
| import org.apache.asterix.common.transactions.TxnId; |
| import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; |
| import org.apache.asterix.file.StorageComponentProvider; |
| import org.apache.asterix.formats.nontagged.BinaryHashFunctionFactoryProvider; |
| import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; |
| import org.apache.asterix.formats.nontagged.TypeTraitProvider; |
| import org.apache.asterix.metadata.MetadataManager; |
| import org.apache.asterix.metadata.MetadataTransactionContext; |
| import org.apache.asterix.metadata.declared.MetadataProvider; |
| import org.apache.asterix.metadata.entities.Dataset; |
| import org.apache.asterix.metadata.entities.Dataverse; |
| import org.apache.asterix.metadata.entities.Index; |
| import org.apache.asterix.metadata.utils.DatasetUtil; |
| import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil; |
| import org.apache.asterix.om.types.ARecordType; |
| import org.apache.asterix.om.types.BuiltinType; |
| import org.apache.asterix.om.types.IAType; |
| import org.apache.asterix.runtime.formats.FormatUtils; |
| import org.apache.asterix.runtime.formats.NonTaggedDataFormat; |
| import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; |
| import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorDescriptor.BulkLoadUsage; |
| import org.apache.asterix.runtime.operators.LSMIndexBulkLoadOperatorNodePushable; |
| import org.apache.asterix.runtime.operators.LSMPrimaryInsertOperatorNodePushable; |
| import org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable; |
| import org.apache.asterix.test.runtime.ExecutionTestUtil; |
| import org.apache.asterix.transaction.management.runtime.CommitRuntime; |
| import org.apache.asterix.transaction.management.service.logging.LogReader; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; |
| import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; |
| import org.apache.hyracks.algebricks.data.ITypeTraitProvider; |
| import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; |
| import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; |
| import org.apache.hyracks.algebricks.runtime.operators.base.SinkRuntimeFactory; |
| import org.apache.hyracks.algebricks.runtime.operators.std.AssignRuntimeFactory; |
| import org.apache.hyracks.algebricks.runtime.operators.std.EmptyTupleSourceRuntimeFactory; |
| import org.apache.hyracks.api.comm.IFrameWriter; |
| import org.apache.hyracks.api.comm.VSizeFrame; |
| import org.apache.hyracks.api.config.IOption; |
| import org.apache.hyracks.api.context.IHyracksJobletContext; |
| import org.apache.hyracks.api.context.IHyracksTaskContext; |
| import org.apache.hyracks.api.dataflow.ActivityId; |
| import org.apache.hyracks.api.dataflow.OperatorDescriptorId; |
| import org.apache.hyracks.api.dataflow.TaskAttemptId; |
| import org.apache.hyracks.api.dataflow.TaskId; |
| import org.apache.hyracks.api.dataflow.value.IBinaryHashFunctionFactory; |
| import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider; |
| import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; |
| import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory; |
| import org.apache.hyracks.api.dataflow.value.ITypeTraits; |
| import org.apache.hyracks.api.dataflow.value.RecordDescriptor; |
| import org.apache.hyracks.api.exceptions.HyracksDataException; |
| import org.apache.hyracks.api.io.FileSplit; |
| import org.apache.hyracks.api.job.JobId; |
| import org.apache.hyracks.api.job.JobSpecification; |
| import org.apache.hyracks.api.util.HyracksConstants; |
| import org.apache.hyracks.dataflow.common.data.partition.FieldHashPartitionerFactory; |
| import org.apache.hyracks.dataflow.common.utils.TaskUtil; |
| import org.apache.hyracks.dataflow.std.file.ConstantFileSplitProvider; |
| import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; |
| import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescriptor; |
| import org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable; |
| import org.apache.hyracks.storage.am.common.api.IIndexBuilder; |
| import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory; |
| import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory; |
| import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory; |
| import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory; |
| import org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory; |
| import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory; |
| import org.apache.hyracks.storage.am.common.impls.NoOpTupleProjectorFactory; |
| import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; |
| import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory; |
| import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory; |
| import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory; |
| import org.apache.hyracks.storage.common.IResourceFactory; |
| import org.apache.hyracks.storage.common.IStorageManager; |
| import org.apache.hyracks.test.support.TestUtils; |
| import org.apache.hyracks.util.TestUtil; |
| import org.apache.hyracks.util.file.FileUtil; |
| import org.apache.logging.log4j.LogManager; |
| import org.apache.logging.log4j.Logger; |
| import org.mockito.Mockito; |
| |
| public class TestNodeController { |
| protected static final Logger LOGGER = LogManager.getLogger(); |
| |
| protected static final String PATH_ACTUAL = "unittest" + File.separator; |
| protected static final String PATH_BASE = FileUtil.joinPath("src", "test", "resources", "nodetests"); |
| |
| protected static final String TEST_CONFIG_FILE_NAME = "src/main/resources/cc.conf"; |
| protected static TransactionProperties txnProperties; |
| private static final boolean CLEANUP_ON_START = true; |
| private static final boolean CLEANUP_ON_STOP = true; |
| |
| // Constants |
| public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098; |
| public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099; |
| public static final int KB32 = 32768; |
| public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01; |
| public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER = TransactionSubsystemProvider.INSTANCE; |
| // Mutables |
| private long jobCounter = 100L; |
| private final String testConfigFileName; |
| private final boolean runHDFS; |
| private final List<Pair<IOption, Object>> options = new ArrayList<>(); |
| |
| public TestNodeController(String testConfigFileName, boolean runHDFS) { |
| this.testConfigFileName = testConfigFileName; |
| this.runHDFS = runHDFS; |
| } |
| |
| public void init() throws Exception { |
| init(CLEANUP_ON_START); |
| } |
| |
| public void init(boolean cleanupOnStart) throws Exception { |
| try { |
| File outdir = new File(PATH_ACTUAL); |
| outdir.mkdirs(); |
| ExecutionTestUtil.setUp(cleanupOnStart, |
| testConfigFileName == null ? TEST_CONFIG_FILE_NAME : testConfigFileName, |
| ExecutionTestUtil.integrationUtil, runHDFS, options); |
| } catch (Throwable th) { |
| th.printStackTrace(); |
| throw th; |
| } |
| } |
| |
| public void deInit() throws Exception { |
| deInit(CLEANUP_ON_STOP); |
| } |
| |
| public void deInit(boolean cleanupOnStop) throws Exception { |
| ExecutionTestUtil.tearDown(cleanupOnStop, runHDFS); |
| } |
| |
| public void setOpts(List<Pair<IOption, Object>> opts) { |
| options.addAll(opts); |
| } |
| |
| public void clearOpts() { |
| options.clear(); |
| ExecutionTestUtil.integrationUtil.clearOptions(); |
| } |
| |
| public TxnId getTxnJobId(IHyracksTaskContext ctx) { |
| return getTxnJobId(ctx.getJobletContext().getJobId()); |
| } |
| |
| public TxnId getTxnJobId(JobId jobId) { |
| return new TxnId(jobId.getId()); |
| } |
| |
| public Pair<SecondaryIndexInfo, LSMIndexBulkLoadOperatorNodePushable> getBulkLoadSecondaryOperator( |
| IHyracksTaskContext ctx, Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, |
| ARecordType metaType, int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, |
| StorageComponentProvider storageComponentProvider, Index secondaryIndex, int numElementsHint) |
| throws HyracksDataException, RemoteException, ACIDException, AlgebricksException { |
| try { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = |
| DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, |
| mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators); |
| SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); |
| IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider); |
| IIndexDataflowHelperFactory primaryIndexHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); |
| Index.ValueIndexDetails secondaryIndexDetails = (Index.ValueIndexDetails) secondaryIndex.getIndexDetails(); |
| int[] fieldPermutation = new int[secondaryIndexDetails.getKeyFieldNames().size()]; |
| for (int i = 0; i < fieldPermutation.length; i++) { |
| fieldPermutation[i] = i; |
| } |
| int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; |
| int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions); |
| IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; |
| ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory( |
| primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); |
| LSMIndexBulkLoadOperatorNodePushable op = new LSMIndexBulkLoadOperatorNodePushable( |
| secondaryIndexHelperFactory, primaryIndexHelperFactory, ctx, 0, fieldPermutation, 1.0F, false, |
| numElementsHint, true, secondaryIndexInfo.rDesc, BulkLoadUsage.CREATE_INDEX, dataset.getDatasetId(), |
| null, tuplePartitionerFactory, partitionsMap); |
| op.setOutputFrameWriter(0, new SinkRuntimeFactory().createPushRuntime(ctx)[0], null); |
| return Pair.of(secondaryIndexInfo, op); |
| } catch (Throwable th) { |
| throw HyracksDataException.create(th); |
| } |
| } |
| |
| public Pair<LSMPrimaryInsertOperatorNodePushable, IPushRuntime> getInsertPipeline(IHyracksTaskContext ctx, |
| Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields, |
| int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, |
| StorageComponentProvider storageComponentProvider, Index secondaryIndex, Index primaryKeyIndex) |
| throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { |
| CcApplicationContext appCtx = |
| (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); |
| MetadataProvider mdProvider = MetadataProvider.createWithDefaultNamespace(appCtx); |
| try { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = |
| DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, |
| mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators); |
| IModificationOperationCallbackFactory modOpCallbackFactory = |
| dataset.getModificationCallbackFactory(storageComponentProvider, primaryIndexInfo.index, |
| IndexOperation.INSERT, primaryIndexInfo.primaryKeyIndexes); |
| ISearchOperationCallbackFactory searchOpCallbackFactory = |
| dataset.getSearchCallbackFactory(storageComponentProvider, primaryIndexInfo.index, |
| IndexOperation.INSERT, primaryIndexInfo.primaryKeyIndexes); |
| IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); |
| RecordDescriptor recordDesc = |
| recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0); |
| IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); |
| IIndexDataflowHelperFactory pkIndexHelperFactory = null; |
| if (primaryKeyIndex != null) { |
| SecondaryIndexInfo pkIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, primaryKeyIndex); |
| pkIndexHelperFactory = new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), |
| pkIndexInfo.fileSplitProvider); |
| } |
| |
| int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; |
| int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions); |
| IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; |
| ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory( |
| primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); |
| |
| LSMPrimaryInsertOperatorNodePushable insertOp = new LSMPrimaryInsertOperatorNodePushable(ctx, |
| ctx.getTaskAttemptId().getTaskId().getPartition(), indexHelperFactory, pkIndexHelperFactory, |
| primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, modOpCallbackFactory, |
| searchOpCallbackFactory, primaryKeyIndexes.length, filterFields, null, tuplePartitionerFactory, |
| partitionsMap); |
| // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta> |
| // for the index, we will have to create an assign operator that extract the sk |
| // then the secondary LSMInsertDeleteOperatorNodePushable |
| if (secondaryIndex != null) { |
| Index.ValueIndexDetails secondaryIndexDetails = |
| (Index.ValueIndexDetails) secondaryIndex.getIndexDetails(); |
| List<List<String>> skNames = secondaryIndexDetails.getKeyFieldNames(); |
| List<Integer> indicators = secondaryIndexDetails.getKeyFieldSourceIndicators(); |
| IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories = |
| new IScalarEvaluatorFactory[skNames.size()]; |
| for (int i = 0; i < skNames.size(); i++) { |
| ARecordType sourceType = dataset.hasMetaPart() |
| ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordType : metaType |
| : recordType; |
| int pos = skNames.get(i).size() > 1 ? -1 : sourceType.getFieldIndex(skNames.get(i).get(0)); |
| secondaryFieldAccessEvalFactories[i] = mdProvider.getDataFormat().getFieldAccessEvaluatorFactory( |
| mdProvider.getFunctionManager(), sourceType, skNames.get(i), pos, null); |
| } |
| // outColumns are computed inside the assign runtime |
| int[] outColumns = new int[skNames.size()]; |
| // projection list include old and new (primary and secondary keys) |
| Index.ValueIndexDetails primaryIndexDetails = |
| (Index.ValueIndexDetails) primaryIndexInfo.index.getIndexDetails(); |
| int[] projectionList = new int[skNames.size() + primaryIndexDetails.getKeyFieldNames().size()]; |
| for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) { |
| outColumns[i] = primaryIndexInfo.rDesc.getFieldCount() + i; |
| } |
| int projCount = 0; |
| for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) { |
| projectionList[projCount++] = primaryIndexInfo.rDesc.getFieldCount() + i; |
| } |
| for (int i = 0; i < primaryIndexDetails.getKeyFieldNames().size(); i++) { |
| projectionList[projCount++] = i; |
| } |
| IPushRuntime assignOp = |
| new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, projectionList, true) |
| .createPushRuntime(ctx)[0]; |
| insertOp.setOutputFrameWriter(0, assignOp, primaryIndexInfo.rDesc); |
| assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); |
| SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); |
| IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider); |
| |
| IModificationOperationCallbackFactory secondaryModCallbackFactory = |
| dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex, |
| IndexOperation.INSERT, primaryKeyIndexes); |
| |
| ITuplePartitionerFactory tuplePartitionerFactory2 = new FieldHashPartitionerFactory( |
| secondaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); |
| LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx, |
| ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations, |
| secondaryIndexInfo.rDesc, IndexOperation.INSERT, false, secondaryIndexHelperFactory, |
| secondaryModCallbackFactory, null, null, tuplePartitionerFactory2, partitionsMap); |
| assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc); |
| |
| IPushRuntime commitOp = |
| dataset.getCommitRuntimeFactory(mdProvider, secondaryIndexInfo.primaryKeyIndexes, true) |
| .createPushRuntime(ctx)[0]; |
| |
| secondaryInsertOp.setOutputFrameWriter(0, commitOp, secondaryIndexInfo.rDesc); |
| commitOp.setInputRecordDescriptor(0, secondaryIndexInfo.rDesc); |
| return Pair.of(insertOp, commitOp); |
| } else { |
| IPushRuntime commitOp = |
| dataset.getCommitRuntimeFactory(mdProvider, primaryIndexInfo.primaryKeyIndexes, true) |
| .createPushRuntime(ctx)[0]; |
| insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc); |
| commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); |
| return Pair.of(insertOp, commitOp); |
| } |
| } finally { |
| mdProvider.getLocks().unlock(); |
| } |
| } |
| |
| public Pair<LSMInsertDeleteOperatorNodePushable, IPushRuntime> getDeletePipeline(IHyracksTaskContext ctx, |
| Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields, |
| int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, |
| StorageComponentProvider storageComponentProvider, Index secondaryIndex) |
| throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { |
| CcApplicationContext appCtx = |
| (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); |
| MetadataProvider mdProvider = MetadataProvider.createWithDefaultNamespace(appCtx); |
| try { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = |
| DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, |
| mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators); |
| IModificationOperationCallbackFactory modOpCallbackFactory = |
| dataset.getModificationCallbackFactory(storageComponentProvider, primaryIndexInfo.index, |
| IndexOperation.DELETE, primaryIndexInfo.primaryKeyIndexes); |
| IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); |
| RecordDescriptor recordDesc = |
| recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0); |
| IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); |
| int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; |
| int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions); |
| IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; |
| ITuplePartitionerFactory tuplePartitionerFactory = new FieldHashPartitionerFactory( |
| primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); |
| LSMInsertDeleteOperatorNodePushable deleteOp = new LSMInsertDeleteOperatorNodePushable(ctx, |
| ctx.getTaskAttemptId().getTaskId().getPartition(), |
| primaryIndexInfo.primaryIndexInsertFieldsPermutations, recordDesc, IndexOperation.DELETE, true, |
| indexHelperFactory, modOpCallbackFactory, null, null, tuplePartitionerFactory, partitionsMap); |
| // For now, this assumes a single secondary index. recordDesc is always <pk-record-meta> |
| // for the index, we will have to create an assign operator that extract the sk |
| // then the secondary LSMInsertDeleteOperatorNodePushable |
| if (secondaryIndex != null) { |
| Index.ValueIndexDetails secondaryIndexDetails = |
| (Index.ValueIndexDetails) secondaryIndex.getIndexDetails(); |
| List<List<String>> skNames = secondaryIndexDetails.getKeyFieldNames(); |
| List<Integer> indicators = secondaryIndexDetails.getKeyFieldSourceIndicators(); |
| IScalarEvaluatorFactory[] secondaryFieldAccessEvalFactories = |
| new IScalarEvaluatorFactory[skNames.size()]; |
| for (int i = 0; i < skNames.size(); i++) { |
| ARecordType sourceType = dataset.hasMetaPart() |
| ? indicators.get(i).intValue() == Index.RECORD_INDICATOR ? recordType : metaType |
| : recordType; |
| int pos = skNames.get(i).size() > 1 ? -1 : sourceType.getFieldIndex(skNames.get(i).get(0)); |
| secondaryFieldAccessEvalFactories[i] = mdProvider.getDataFormat().getFieldAccessEvaluatorFactory( |
| mdProvider.getFunctionManager(), sourceType, skNames.get(i), pos, null); |
| } |
| // outColumns are computed inside the assign runtime |
| int[] outColumns = new int[skNames.size()]; |
| // projection list include old and new (primary and secondary keys) |
| Index.ValueIndexDetails primaryIndexDetails = |
| (Index.ValueIndexDetails) primaryIndexInfo.index.getIndexDetails(); |
| int[] projectionList = new int[skNames.size() + primaryIndexDetails.getKeyFieldNames().size()]; |
| for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) { |
| outColumns[i] = primaryIndexInfo.rDesc.getFieldCount() + i; |
| } |
| int projCount = 0; |
| for (int i = 0; i < secondaryFieldAccessEvalFactories.length; i++) { |
| projectionList[projCount++] = primaryIndexInfo.rDesc.getFieldCount() + i; |
| } |
| for (int i = 0; i < primaryIndexDetails.getKeyFieldNames().size(); i++) { |
| projectionList[projCount++] = i; |
| } |
| IPushRuntime assignOp = |
| new AssignRuntimeFactory(outColumns, secondaryFieldAccessEvalFactories, projectionList, true) |
| .createPushRuntime(ctx)[0]; |
| deleteOp.setOutputFrameWriter(0, assignOp, primaryIndexInfo.rDesc); |
| assignOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); |
| SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); |
| IIndexDataflowHelperFactory secondaryIndexHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), secondaryIndexInfo.fileSplitProvider); |
| |
| IModificationOperationCallbackFactory secondaryModCallbackFactory = |
| dataset.getModificationCallbackFactory(storageComponentProvider, secondaryIndex, |
| IndexOperation.INSERT, primaryKeyIndexes); |
| |
| ITuplePartitionerFactory tuplePartitionerFactory2 = new FieldHashPartitionerFactory( |
| secondaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); |
| LSMInsertDeleteOperatorNodePushable secondaryInsertOp = new LSMInsertDeleteOperatorNodePushable(ctx, |
| ctx.getTaskAttemptId().getTaskId().getPartition(), secondaryIndexInfo.insertFieldsPermutations, |
| secondaryIndexInfo.rDesc, IndexOperation.DELETE, false, secondaryIndexHelperFactory, |
| secondaryModCallbackFactory, null, null, tuplePartitionerFactory2, partitionsMap); |
| assignOp.setOutputFrameWriter(0, secondaryInsertOp, secondaryIndexInfo.rDesc); |
| |
| IPushRuntime commitOp = |
| dataset.getCommitRuntimeFactory(mdProvider, secondaryIndexInfo.primaryKeyIndexes, true) |
| .createPushRuntime(ctx)[0]; |
| |
| secondaryInsertOp.setOutputFrameWriter(0, commitOp, secondaryIndexInfo.rDesc); |
| commitOp.setInputRecordDescriptor(0, secondaryIndexInfo.rDesc); |
| return Pair.of(deleteOp, commitOp); |
| } else { |
| IPushRuntime commitOp = |
| dataset.getCommitRuntimeFactory(mdProvider, primaryIndexInfo.primaryKeyIndexes, true) |
| .createPushRuntime(ctx)[0]; |
| deleteOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc); |
| commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc); |
| return Pair.of(deleteOp, commitOp); |
| } |
| } finally { |
| mdProvider.getLocks().unlock(); |
| } |
| } |
| |
| public IPushRuntime getFullScanPipeline(IFrameWriter countOp, IHyracksTaskContext ctx, Dataset dataset, |
| IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, |
| NoMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, int[] filterFields, |
| int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, |
| StorageComponentProvider storageComponentProvider) throws HyracksDataException, AlgebricksException { |
| IPushRuntime emptyTupleOp = new EmptyTupleSourceRuntimeFactory().createPushRuntime(ctx)[0]; |
| JobSpecification spec = new JobSpecification(); |
| PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, |
| mergePolicyFactory, mergePolicyProperties, filterFields, primaryKeyIndexes, primaryKeyIndicators); |
| IIndexDataflowHelperFactory indexDataflowHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); |
| BTreeSearchOperatorDescriptor searchOpDesc = new BTreeSearchOperatorDescriptor(spec, primaryIndexInfo.rDesc, |
| null, null, true, true, indexDataflowHelperFactory, false, false, null, |
| NoOpOperationCallbackFactory.INSTANCE, filterFields, filterFields, false, null); |
| BTreeSearchOperatorNodePushable searchOp = |
| searchOpDesc.createPushRuntime(ctx, primaryIndexInfo.getSearchRecordDescriptorProvider(), |
| ctx.getTaskAttemptId().getTaskId().getPartition(), 1); |
| emptyTupleOp.setOutputFrameWriter(0, searchOp, |
| primaryIndexInfo.getSearchRecordDescriptorProvider().getInputRecordDescriptor(null, 0)); |
| searchOp.setOutputFrameWriter(0, countOp, primaryIndexInfo.rDesc); |
| return emptyTupleOp; |
| } |
| |
| public LogReader getTransactionLogReader(boolean isRecoveryMode) { |
| return (LogReader) getTransactionSubsystem().getLogManager().getLogReader(isRecoveryMode); |
| } |
| |
| public JobId newJobId() { |
| return new JobId(jobCounter++); |
| } |
| |
| public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, |
| ARecordType metaType, int[] filterFields, IStorageComponentProvider storageComponentProvider, |
| int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators, int partition) |
| throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = |
| DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, primaryKeyTypes, recordType, metaType, |
| mergePolicy.first, mergePolicy.second, filterFields, primaryKeyIndexes, primaryKeyIndicators); |
| Dataverse dataverse = new Dataverse(dataset.getDatabaseName(), dataset.getDataverseName(), |
| NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP); |
| Namespace namespace = new Namespace(dataverse.getDatabaseName(), dataverse.getDataverseName()); |
| MetadataProvider mdProvider = MetadataProvider.create( |
| (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), namespace); |
| try { |
| IResourceFactory resourceFactory = dataset.getResourceFactory(mdProvider, primaryIndexInfo.index, |
| recordType, metaType, mergePolicy.first, mergePolicy.second); |
| IndexBuilderFactory indexBuilderFactory = |
| new IndexBuilderFactory(storageComponentProvider.getStorageManager(), |
| primaryIndexInfo.getFileSplitProvider(), resourceFactory, true); |
| IHyracksTaskContext ctx = createTestContext(newJobId(), partition, false); |
| IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition); |
| indexBuilder.build(); |
| } finally { |
| mdProvider.getLocks().unlock(); |
| } |
| return primaryIndexInfo; |
| } |
| |
| public SecondaryIndexInfo createSecondaryIndex(PrimaryIndexInfo primaryIndexInfo, Index secondaryIndex, |
| IStorageComponentProvider storageComponentProvider, int partition) |
| throws AlgebricksException, HyracksDataException, RemoteException, ACIDException { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = |
| DatasetUtil.getMergePolicyFactory(primaryIndexInfo.dataset, mdTxnCtx); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| Dataverse dataverse = |
| new Dataverse(primaryIndexInfo.dataset.getDatabaseName(), primaryIndexInfo.dataset.getDataverseName(), |
| NonTaggedDataFormat.class.getName(), MetadataUtil.PENDING_NO_OP); |
| Namespace namespace = new Namespace(dataverse.getDatabaseName(), dataverse.getDataverseName()); |
| MetadataProvider mdProvider = MetadataProvider.create( |
| (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), namespace); |
| SecondaryIndexInfo secondaryIndexInfo = new SecondaryIndexInfo(primaryIndexInfo, secondaryIndex); |
| try { |
| IResourceFactory resourceFactory = primaryIndexInfo.dataset.getResourceFactory(mdProvider, secondaryIndex, |
| primaryIndexInfo.recordType, primaryIndexInfo.metaType, mergePolicy.first, mergePolicy.second); |
| IndexBuilderFactory indexBuilderFactory = |
| new IndexBuilderFactory(storageComponentProvider.getStorageManager(), |
| secondaryIndexInfo.fileSplitProvider, resourceFactory, true); |
| IHyracksTaskContext ctx = createTestContext(newJobId(), partition, false); |
| IIndexBuilder indexBuilder = indexBuilderFactory.create(ctx, partition); |
| indexBuilder.build(); |
| } finally { |
| mdProvider.getLocks().unlock(); |
| } |
| return secondaryIndexInfo; |
| } |
| |
| public static ISerializerDeserializer<?>[] createPrimaryIndexSerdes(int primaryIndexNumOfTupleFields, |
| IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType) { |
| int i = 0; |
| ISerializerDeserializer<?>[] primaryIndexSerdes = new ISerializerDeserializer<?>[primaryIndexNumOfTupleFields]; |
| for (; i < primaryKeyTypes.length; i++) { |
| primaryIndexSerdes[i] = |
| SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]); |
| } |
| primaryIndexSerdes[i++] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(recordType); |
| if (metaType != null) { |
| primaryIndexSerdes[i] = SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(metaType); |
| } |
| return primaryIndexSerdes; |
| } |
| |
| public static ISerializerDeserializer<?>[] createSecondaryIndexSerdes(ARecordType recordType, ARecordType metaType, |
| IAType[] primaryKeyTypes, IAType[] secondaryKeyTypes) { |
| ISerializerDeserializer<?>[] secondaryIndexSerdes = |
| new ISerializerDeserializer<?>[secondaryKeyTypes.length + primaryKeyTypes.length]; |
| int i = 0; |
| for (; i < secondaryKeyTypes.length; i++) { |
| secondaryIndexSerdes[i] = |
| SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(secondaryKeyTypes[i]); |
| } |
| for (; i < secondaryKeyTypes.length + primaryKeyTypes.length; i++) { |
| secondaryIndexSerdes[i] = SerializerDeserializerProvider.INSTANCE |
| .getSerializerDeserializer(primaryKeyTypes[i - secondaryKeyTypes.length]); |
| } |
| return secondaryIndexSerdes; |
| } |
| |
| public static ITypeTraits[] createPrimaryIndexTypeTraits(int primaryIndexNumOfTupleFields, IAType[] primaryKeyTypes, |
| ARecordType recordType, ARecordType metaType) { |
| ITypeTraits[] primaryIndexTypeTraits = new ITypeTraits[primaryIndexNumOfTupleFields]; |
| int i = 0; |
| for (; i < primaryKeyTypes.length; i++) { |
| primaryIndexTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]); |
| } |
| primaryIndexTypeTraits[i++] = TypeTraitProvider.INSTANCE.getTypeTrait(recordType); |
| if (metaType != null) { |
| primaryIndexTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(metaType); |
| } |
| return primaryIndexTypeTraits; |
| } |
| |
| public static ITypeTraits[] createSecondaryIndexTypeTraits(ARecordType recordType, ARecordType metaType, |
| IAType[] primaryKeyTypes, IAType[] secondaryKeyTypes) { |
| ITypeTraits[] secondaryIndexTypeTraits = new ITypeTraits[secondaryKeyTypes.length + primaryKeyTypes.length]; |
| int i = 0; |
| for (; i < secondaryKeyTypes.length; i++) { |
| secondaryIndexTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(secondaryKeyTypes[i]); |
| } |
| for (; i < secondaryKeyTypes.length + primaryKeyTypes.length; i++) { |
| secondaryIndexTypeTraits[i] = |
| TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i - secondaryKeyTypes.length]); |
| } |
| return secondaryIndexTypeTraits; |
| } |
| |
| public IHyracksTaskContext createTestContext(JobId jobId, int partition, boolean withMessaging) |
| throws HyracksDataException { |
| IHyracksTaskContext ctx = TestUtils.create(KB32, ExecutionTestUtil.integrationUtil.ncs[0].getIoManager()); |
| if (withMessaging) { |
| TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx), ctx); |
| } |
| IHyracksJobletContext jobletCtx = Mockito.mock(IHyracksJobletContext.class); |
| JobEventListenerFactory factory = new JobEventListenerFactory(new TxnId(jobId.getId()), true); |
| Mockito.when(jobletCtx.getJobletEventListenerFactory()).thenReturn(factory); |
| Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext()); |
| Mockito.when(jobletCtx.getJobId()).thenReturn(jobId); |
| ctx = Mockito.spy(ctx); |
| Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx); |
| Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager()); |
| TaskAttemptId taskId = |
| new TaskAttemptId(new TaskId(new ActivityId(new OperatorDescriptorId(0), 0), partition), 0); |
| Mockito.when(ctx.getTaskAttemptId()).thenReturn(taskId); |
| return ctx; |
| } |
| |
| public TransactionSubsystem getTransactionSubsystem() { |
| return (TransactionSubsystem) ((NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0] |
| .getApplicationContext()).getTransactionSubsystem(); |
| } |
| |
| public ITransactionManager getTransactionManager() { |
| return getTransactionSubsystem().getTransactionManager(); |
| } |
| |
| public NCAppRuntimeContext getAppRuntimeContext() { |
| return (NCAppRuntimeContext) ExecutionTestUtil.integrationUtil.ncs[0].getApplicationContext(); |
| } |
| |
| public DatasetLifecycleManager getDatasetLifecycleManager() { |
| return (DatasetLifecycleManager) getAppRuntimeContext().getDatasetLifecycleManager(); |
| } |
| |
| public static class SecondaryIndexInfo { |
| final int[] primaryKeyIndexes; |
| final PrimaryIndexInfo primaryIndexInfo; |
| final Index secondaryIndex; |
| final ConstantFileSplitProvider fileSplitProvider; |
| final ISerializerDeserializer<?>[] secondaryIndexSerdes; |
| final RecordDescriptor rDesc; |
| final int[] insertFieldsPermutations; |
| final ITypeTraits[] secondaryIndexTypeTraits; |
| |
| public SecondaryIndexInfo(PrimaryIndexInfo primaryIndexInfo, Index secondaryIndex) { |
| this.primaryIndexInfo = primaryIndexInfo; |
| this.secondaryIndex = secondaryIndex; |
| Index.ValueIndexDetails secondaryIndexDetails = (Index.ValueIndexDetails) secondaryIndex.getIndexDetails(); |
| List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId()); |
| CcApplicationContext appCtx = |
| (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); |
| String dvPath = new NamespacePathResolver(false).resolve(primaryIndexInfo.dataset.getDatabaseName(), |
| primaryIndexInfo.dataset.getDataverseName()); |
| FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), |
| primaryIndexInfo.dataset, secondaryIndex.getIndexName(), nodes, dvPath); |
| fileSplitProvider = new ConstantFileSplitProvider(splits); |
| secondaryIndexTypeTraits = createSecondaryIndexTypeTraits(primaryIndexInfo.recordType, |
| primaryIndexInfo.metaType, primaryIndexInfo.primaryKeyTypes, secondaryIndexDetails |
| .getKeyFieldTypes().toArray(new IAType[secondaryIndexDetails.getKeyFieldTypes().size()])); |
| secondaryIndexSerdes = createSecondaryIndexSerdes(primaryIndexInfo.recordType, primaryIndexInfo.metaType, |
| primaryIndexInfo.primaryKeyTypes, secondaryIndexDetails.getKeyFieldTypes() |
| .toArray(new IAType[secondaryIndexDetails.getKeyFieldTypes().size()])); |
| rDesc = new RecordDescriptor(secondaryIndexSerdes, secondaryIndexTypeTraits); |
| insertFieldsPermutations = new int[secondaryIndexTypeTraits.length]; |
| for (int i = 0; i < insertFieldsPermutations.length; i++) { |
| insertFieldsPermutations[i] = i; |
| } |
| primaryKeyIndexes = new int[primaryIndexInfo.primaryKeyIndexes.length]; |
| for (int i = 0; i < primaryKeyIndexes.length; i++) { |
| primaryKeyIndexes[i] = i + secondaryIndexDetails.getKeyFieldNames().size(); |
| } |
| } |
| |
| public IFileSplitProvider getFileSplitProvider() { |
| return fileSplitProvider; |
| } |
| |
| public ISerializerDeserializer<?>[] getSerdes() { |
| return secondaryIndexSerdes; |
| } |
| } |
| |
| public static class PrimaryIndexInfo { |
| private final Dataset dataset; |
| private final IAType[] primaryKeyTypes; |
| private final ARecordType recordType; |
| private final ARecordType metaType; |
| private final ILSMMergePolicyFactory mergePolicyFactory; |
| private final Map<String, String> mergePolicyProperties; |
| private final int primaryIndexNumOfTupleFields; |
| private final ITypeTraits[] primaryIndexTypeTraits; |
| private final IBinaryHashFunctionFactory[] hashFuncFactories; |
| private final ISerializerDeserializer<?>[] primaryIndexSerdes; |
| private final ConstantFileSplitProvider fileSplitProvider; |
| private final RecordDescriptor rDesc; |
| private final int[] primaryIndexInsertFieldsPermutations; |
| private final int[] primaryKeyIndexes; |
| private final Index index; |
| |
| public PrimaryIndexInfo(Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType, ARecordType metaType, |
| ILSMMergePolicyFactory mergePolicyFactory, Map<String, String> mergePolicyProperties, |
| int[] filterFields, int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators) |
| throws AlgebricksException { |
| this.dataset = dataset; |
| this.primaryKeyTypes = primaryKeyTypes; |
| this.recordType = recordType; |
| this.metaType = metaType; |
| this.mergePolicyFactory = mergePolicyFactory; |
| this.mergePolicyProperties = mergePolicyProperties; |
| this.primaryKeyIndexes = primaryKeyIndexes; |
| primaryIndexNumOfTupleFields = primaryKeyTypes.length + (1 + ((metaType == null) ? 0 : 1)) |
| + (filterFields != null ? filterFields.length : 0); |
| primaryIndexTypeTraits = |
| createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType); |
| hashFuncFactories = new IBinaryHashFunctionFactory[primaryKeyTypes.length]; |
| for (int i = 0; i < primaryKeyTypes.length; i++) { |
| hashFuncFactories[i] = |
| BinaryHashFunctionFactoryProvider.INSTANCE.getBinaryHashFunctionFactory(primaryKeyTypes[i]); |
| } |
| primaryIndexSerdes = |
| createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, primaryKeyTypes, recordType, metaType); |
| rDesc = new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); |
| primaryIndexInsertFieldsPermutations = new int[primaryIndexNumOfTupleFields]; |
| for (int i = 0; i < primaryIndexNumOfTupleFields; i++) { |
| primaryIndexInsertFieldsPermutations[i] = i; |
| } |
| List<List<String>> keyFieldNames = new ArrayList<>(); |
| List<IAType> keyFieldTypes = Arrays.asList(primaryKeyTypes); |
| for (int i = 0; i < primaryKeyIndicators.size(); i++) { |
| Integer indicator = primaryKeyIndicators.get(i); |
| String[] fieldNames = |
| indicator == Index.RECORD_INDICATOR ? recordType.getFieldNames() : metaType.getFieldNames(); |
| keyFieldNames.add(Collections.singletonList(fieldNames[primaryKeyIndexes[i]])); |
| } |
| index = Index.createPrimaryIndex(dataset.getDatabaseName(), dataset.getDataverseName(), |
| dataset.getDatasetName(), keyFieldNames, primaryKeyIndicators, keyFieldTypes, |
| MetadataUtil.PENDING_NO_OP); |
| List<String> nodes = Collections.singletonList(ExecutionTestUtil.integrationUtil.ncs[0].getId()); |
| CcApplicationContext appCtx = |
| (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); |
| String dvPath = |
| new NamespacePathResolver(false).resolve(dataset.getDatabaseName(), dataset.getDataverseName()); |
| FileSplit[] splits = SplitsAndConstraintsUtil.getIndexSplits(appCtx.getClusterStateManager(), dataset, |
| index.getIndexName(), nodes, dvPath); |
| fileSplitProvider = new ConstantFileSplitProvider(splits); |
| } |
| |
| public Index getIndex() { |
| return index; |
| } |
| |
| public Dataset getDataset() { |
| return dataset; |
| } |
| |
| public IRecordDescriptorProvider getInsertRecordDescriptorProvider() { |
| IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class); |
| Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())).thenReturn(rDesc); |
| return rDescProvider; |
| } |
| |
| public IRecordDescriptorProvider getSearchRecordDescriptorProvider() { |
| ITypeTraits[] primaryKeyTypeTraits = new ITypeTraits[primaryKeyTypes.length]; |
| ISerializerDeserializer<?>[] primaryKeySerdes = new ISerializerDeserializer<?>[primaryKeyTypes.length]; |
| for (int i = 0; i < primaryKeyTypes.length; i++) { |
| primaryKeyTypeTraits[i] = TypeTraitProvider.INSTANCE.getTypeTrait(primaryKeyTypes[i]); |
| primaryKeySerdes[i] = |
| SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(primaryKeyTypes[i]); |
| } |
| RecordDescriptor searcgRecDesc = new RecordDescriptor(primaryKeySerdes, primaryKeyTypeTraits); |
| IRecordDescriptorProvider rDescProvider = Mockito.mock(IRecordDescriptorProvider.class); |
| Mockito.when(rDescProvider.getInputRecordDescriptor(Mockito.any(), Mockito.anyInt())) |
| .thenReturn(searcgRecDesc); |
| return rDescProvider; |
| } |
| |
| public ConstantFileSplitProvider getFileSplitProvider() { |
| return fileSplitProvider; |
| } |
| } |
| |
| public RecordDescriptor getSearchOutputDesc(IAType[] keyTypes, ARecordType recordType, ARecordType metaType) { |
| int primaryIndexNumOfTupleFields = keyTypes.length + (1 + ((metaType == null) ? 0 : 1)); |
| ITypeTraits[] primaryIndexTypeTraits = |
| createPrimaryIndexTypeTraits(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType); |
| ISerializerDeserializer<?>[] primaryIndexSerdes = |
| createPrimaryIndexSerdes(primaryIndexNumOfTupleFields, keyTypes, recordType, metaType); |
| return new RecordDescriptor(primaryIndexSerdes, primaryIndexTypeTraits); |
| } |
| |
| public IndexDataflowHelperFactory getPrimaryIndexDataflowHelperFactory(PrimaryIndexInfo primaryIndexInfo, |
| IStorageComponentProvider storageComponentProvider) throws AlgebricksException { |
| return new IndexDataflowHelperFactory(storageComponentProvider.getStorageManager(), |
| primaryIndexInfo.getFileSplitProvider()); |
| } |
| |
| public IStorageManager getStorageManager() { |
| CcApplicationContext appCtx = |
| (CcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext(); |
| return appCtx.getStorageManager(); |
| } |
| |
| public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime> getUpsertPipeline(IHyracksTaskContext ctx, |
| Dataset dataset, IAType[] keyTypes, ARecordType recordType, ARecordType metaType, int[] filterFields, |
| int[] keyIndexes, List<Integer> keyIndicators, StorageComponentProvider storageComponentProvider, |
| IFrameOperationCallbackFactory frameOpCallbackFactory, boolean hasSecondaries) throws Exception { |
| MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); |
| MetadataProvider mdProvider = MetadataProvider.createWithDefaultNamespace( |
| (ICcApplicationContext) ExecutionTestUtil.integrationUtil.cc.getApplicationContext()); |
| org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory, Map<String, String>> mergePolicy = |
| DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx); |
| MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); |
| PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset, keyTypes, recordType, metaType, |
| mergePolicy.first, mergePolicy.second, filterFields, keyIndexes, keyIndicators); |
| IModificationOperationCallbackFactory modificationCallbackFactory = dataset.getModificationCallbackFactory( |
| storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes); |
| ISearchOperationCallbackFactory searchCallbackFactory = dataset.getSearchCallbackFactory( |
| storageComponentProvider, primaryIndexInfo.index, IndexOperation.UPSERT, keyIndexes); |
| IRecordDescriptorProvider recordDescProvider = primaryIndexInfo.getInsertRecordDescriptorProvider(); |
| IIndexDataflowHelperFactory indexHelperFactory = new IndexDataflowHelperFactory( |
| storageComponentProvider.getStorageManager(), primaryIndexInfo.getFileSplitProvider()); |
| int numPartitions = primaryIndexInfo.getFileSplitProvider().getFileSplits().length; |
| int[][] partitionsMap = TestUtil.getPartitionsMap(numPartitions); |
| IBinaryHashFunctionFactory[] pkHashFunFactories = primaryIndexInfo.hashFuncFactories; |
| ITuplePartitionerFactory tuplePartitionerFactory = |
| new FieldHashPartitionerFactory(primaryIndexInfo.primaryKeyIndexes, pkHashFunFactories, numPartitions); |
| LSMPrimaryUpsertOperatorNodePushable insertOp = |
| new LSMPrimaryUpsertOperatorNodePushable(ctx, ctx.getTaskAttemptId().getTaskId().getPartition(), |
| indexHelperFactory, primaryIndexInfo.primaryIndexInsertFieldsPermutations, |
| recordDescProvider.getInputRecordDescriptor(new ActivityId(new OperatorDescriptorId(0), 0), 0), |
| modificationCallbackFactory, searchCallbackFactory, keyIndexes.length, 0, recordType, -1, |
| frameOpCallbackFactory == null ? dataset.getFrameOpCallbackFactory(mdProvider) |
| : frameOpCallbackFactory, |
| MissingWriterFactory.INSTANCE, hasSecondaries, NoOpTupleProjectorFactory.INSTANCE, |
| tuplePartitionerFactory, partitionsMap); |
| RecordDescriptor upsertOutRecDesc = getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset, |
| filterFields == null ? 0 : filterFields.length, recordType, metaType); |
| // fix pk fields |
| int start = 1 + (dataset.hasMetaPart() ? 2 : 1) + (filterFields == null ? 0 : filterFields.length); |
| int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()]; |
| for (int i = 0; i < pkFieldsInCommitOp.length; i++) { |
| pkFieldsInCommitOp[i] = start++; |
| } |
| CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx), dataset.getDatasetId(), pkFieldsInCommitOp, |
| true, ctx.getTaskAttemptId().getTaskId().getPartition(), true, null, null); |
| insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc); |
| commitOp.setInputRecordDescriptor(0, upsertOutRecDesc); |
| return Pair.of(insertOp, commitOp); |
| } |
| |
| private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor inputRecordDesc, Dataset dataset, int numFilterFields, |
| ARecordType itemType, ARecordType metaItemType) throws Exception { |
| // 1 boolean field at the beginning to indicate whether the operation was upsert or delete |
| int numOutFields = 1 + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields + inputRecordDesc.getFieldCount(); |
| ITypeTraits[] outputTypeTraits = new ITypeTraits[numOutFields]; |
| ISerializerDeserializer<?>[] outputSerDes = new ISerializerDeserializer[numOutFields]; |
| |
| ISerializerDeserializerProvider serdeProvider = FormatUtils.getDefaultFormat().getSerdeProvider(); |
| ITypeTraitProvider typeTraitProvider = FormatUtils.getDefaultFormat().getTypeTraitProvider(); |
| int f = 0; |
| // add the upsert indicator boolean field |
| outputSerDes[f] = serdeProvider.getSerializerDeserializer(BuiltinType.AINT8); |
| outputTypeTraits[f] = typeTraitProvider.getTypeTrait(BuiltinType.AINT8); |
| f++; |
| // add the previous record |
| outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType); |
| outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType); |
| f++; |
| // add the previous meta second |
| if (dataset.hasMetaPart()) { |
| outputSerDes[f] = serdeProvider.getSerializerDeserializer(metaItemType); |
| outputTypeTraits[f] = typeTraitProvider.getTypeTrait(metaItemType); |
| f++; |
| } |
| // add the previous filter third |
| int fieldIdx = -1; |
| if (numFilterFields > 0) { |
| String filterField = DatasetUtil.getFilterField(dataset).get(0); |
| String[] fieldNames = itemType.getFieldNames(); |
| int i = 0; |
| for (; i < fieldNames.length; i++) { |
| if (fieldNames[i].equals(filterField)) { |
| break; |
| } |
| } |
| fieldIdx = i; |
| outputTypeTraits[f] = typeTraitProvider.getTypeTrait(itemType.getFieldTypes()[fieldIdx]); |
| outputSerDes[f] = serdeProvider.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]); |
| f++; |
| } |
| for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) { |
| outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j]; |
| outputSerDes[j + f] = inputRecordDesc.getFields()[j]; |
| } |
| return new RecordDescriptor(outputSerDes, outputTypeTraits); |
| } |
| } |