blob: 7e51ec16dbeb3a9a1d0e218cdb0337a5ed102f8c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.asterix.transaction.management.opcallbacks.LockThenSearchOperationCallback;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.util.CleanupUtils;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
import org.apache.hyracks.dataflow.common.utils.TaskUtil;
import org.apache.hyracks.storage.am.btree.impls.RangePredicate;
import org.apache.hyracks.storage.am.btree.util.BTreeUtils;
import org.apache.hyracks.storage.am.common.api.IIndexDataflowHelper;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.IndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMTreeIndexAccessor;
import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.MultiComparator;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
public class LSMPrimaryInsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
private final IIndexDataflowHelper[] keyIndexHelpers;
private MultiComparator keySearchCmp;
private RangePredicate searchPred;
private final IIndexCursor[] cursors;
private final LockThenSearchOperationCallback[] searchCallbacks;
private final ISearchOperationCallbackFactory searchCallbackFactory;
private final IFrameTupleProcessor[] processors;
private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes;
private final LSMTreeIndexAccessor[] lsmAccessorForUniqunessChecks;
private final IFrameOperationCallback[] frameOpCallbacks;
private boolean flushedPartialTuples;
private int currentTupleIdx;
private int lastFlushedTupleIdx;
private final PermutingFrameTupleReference keyTuple;
private final Int2ObjectMap<IntSet> partition2TuplesMap = new Int2ObjectOpenHashMap<>();
private final SourceLocation sourceLoc;
public LSMPrimaryInsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, IIndexDataflowHelperFactory keyIndexHelperFactory,
int[] fieldPermutation, RecordDescriptor inputRecDesc,
IModificationOperationCallbackFactory modCallbackFactory,
ISearchOperationCallbackFactory searchCallbackFactory, int numOfPrimaryKeys, int[] filterFields,
SourceLocation sourceLoc, ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap)
throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
modCallbackFactory, null, tuplePartitionerFactory, partitionsMap);
this.sourceLoc = sourceLoc;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
this.searchCallbacks = new LockThenSearchOperationCallback[partitions.length];
this.cursors = new IIndexCursor[partitions.length];
this.lsmAccessorForUniqunessChecks = new LSMTreeIndexAccessor[partitions.length];
this.lsmAccessorForKeyIndexes = new LSMTreeIndexAccessor[partitions.length];
this.keyIndexHelpers = new IIndexDataflowHelper[partitions.length];
this.processors = new IFrameTupleProcessor[partitions.length];
if (keyIndexHelperFactory != null) {
for (int i = 0; i < partitions.length; i++) {
this.keyIndexHelpers[i] =
keyIndexHelperFactory.create(ctx.getJobletContext().getServiceContext(), partitions[i]);
}
}
this.searchCallbackFactory = searchCallbackFactory;
int numFilterFieds = filterFields != null ? filterFields.length : 0;
int[] searchKeyPermutations = new int[numOfPrimaryKeys + numFilterFieds];
for (int i = 0; i < numOfPrimaryKeys; i++) {
searchKeyPermutations[i] = fieldPermutation[i];
}
if (filterFields != null) {
for (int i = numOfPrimaryKeys; i < searchKeyPermutations.length; i++) {
searchKeyPermutations[i] = filterFields[i - numOfPrimaryKeys];
}
}
keyTuple = new PermutingFrameTupleReference(searchKeyPermutations);
}
protected void beforeModification(ITupleReference tuple) {
// this is used for extensions to modify tuples before persistence
// do nothing in the master branch
}
protected void createTupleProcessors(SourceLocation sourceLoc) {
for (int i = 0; i < partitions.length; i++) {
LSMTreeIndexAccessor lsmAccessorForUniqunessCheck = lsmAccessorForUniqunessChecks[i];
IIndexCursor cursor = cursors[i];
IIndexAccessor indexAccessor = indexAccessors[i];
LSMTreeIndexAccessor lsmAccessorForKeyIndex = lsmAccessorForKeyIndexes[i];
LockThenSearchOperationCallback searchCallback = searchCallbacks[i];
processors[i] = new IFrameTupleProcessor() {
@Override
public void process(FrameTupleAccessor accessor, ITupleReference tuple, int index)
throws HyracksDataException {
if (index < currentTupleIdx) {
// already processed; skip
return;
}
keyTuple.reset(accessor, index);
searchPred.reset(keyTuple, keyTuple, true, true, keySearchCmp, keySearchCmp);
boolean duplicate = false;
lsmAccessorForUniqunessCheck.search(cursor, searchPred);
try {
if (cursor.hasNext()) {
// duplicate, skip
searchCallback.release();
duplicate = true;
}
} finally {
cursor.close();
}
if (!duplicate) {
beforeModification(tuple);
((ILSMIndexAccessor) indexAccessor).forceUpsert(tuple);
if (lsmAccessorForKeyIndex != null) {
lsmAccessorForKeyIndex.forceUpsert(keyTuple);
}
} else {
// we should flush previous inserted records so that these transactions can commit
flushPartialFrame();
// feed requires this nested exception to remove duplicated tuples
// TODO: a better way to treat duplicates?
throw HyracksDataException.create(ErrorCode.ERROR_PROCESSING_TUPLE,
HyracksDataException.create(ErrorCode.DUPLICATE_KEY), sourceLoc, index);
}
currentTupleIdx = index + 1;
}
@Override
public void start() throws HyracksDataException {
((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
}
@Override
public void finish() throws HyracksDataException {
((LSMTreeIndexAccessor) indexAccessor).getCtx().setOperation(IndexOperation.UPSERT);
}
@Override
public void fail(Throwable th) {
// no op
}
};
}
}
@Override
public void open() throws HyracksDataException {
currentTupleIdx = 0;
lastFlushedTupleIdx = 0;
flushedPartialTuples = false;
accessor = new FrameTupleAccessor(inputRecDesc);
writeBuffer = new VSizeFrame(ctx);
try {
INcApplicationContext appCtx =
(INcApplicationContext) ctx.getJobletContext().getServiceContext().getApplicationContext();
for (int i = 0; i < partitions.length; i++) {
IIndexDataflowHelper indexHelper = indexHelpers[i];
indexHelper.open();
indexes[i] = indexHelper.getIndexInstance();
IIndex index = indexes[i];
IIndexDataflowHelper keyIndexHelper = keyIndexHelpers[i];
IIndex indexForUniquessCheck;
if (keyIndexHelper != null) {
keyIndexHelper.open();
indexForUniquessCheck = keyIndexHelper.getIndexInstance();
} else {
indexForUniquessCheck = index;
}
if (ctx.getSharedObject() != null && i == 0) {
PrimaryIndexLogMarkerCallback callback =
new PrimaryIndexLogMarkerCallback((AbstractLSMIndex) indexes[0]);
TaskUtil.put(ILogMarkerCallback.KEY_MARKER_CALLBACK, callback, ctx);
}
writer.open();
writerOpen = true;
modCallbacks[i] =
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(), ctx, this);
searchCallbacks[i] = (LockThenSearchOperationCallback) searchCallbackFactory
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
IIndexAccessParameters iap = new IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
indexAccessors[i] = index.createAccessor(iap);
if (keyIndexHelper != null) {
lsmAccessorForKeyIndexes[i] = (LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iap);
}
frameOpCallbacks[i] = NoOpFrameOperationCallbackFactory.INSTANCE.createFrameOperationCallback(ctx,
(ILSMIndexAccessor) indexAccessors[i]);
frameOpCallbacks[i].open();
IIndexAccessParameters iapForUniquenessCheck =
new IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
lsmAccessorForUniqunessChecks[i] =
(LSMTreeIndexAccessor) indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
cursors[i] = lsmAccessorForUniqunessChecks[i].createSearchCursor(false);
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
appCtx.getTransactionSubsystem().getLogManager());
}
createTupleProcessors(sourceLoc);
keySearchCmp =
BTreeUtils.getSearchMultiComparator(((ITreeIndex) indexes[0]).getComparatorFactories(), frameTuple);
searchPred = new RangePredicate(frameTuple, frameTuple, true, true, keySearchCmp, keySearchCmp, null, null);
appender = new FrameTupleAppender(new VSizeFrame(ctx), true);
frameTuple = new FrameTupleReference();
} catch (Throwable e) { // NOSONAR: Re-thrown
throw HyracksDataException.create(e);
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
partition2TuplesMap.clear();
int itemCount = accessor.getTupleCount();
for (int i = 0; i < itemCount; i++) {
int storagePartition = tuplePartitioner.partition(accessor, i);
int pIdx = storagePartitionId2Index.get(storagePartition);
IntSet tupleIndexes = partition2TuplesMap.computeIfAbsent(pIdx, k -> new IntOpenHashSet());
tupleIndexes.add(i);
}
for (Int2ObjectMap.Entry<IntSet> p2tuplesMapEntry : partition2TuplesMap.int2ObjectEntrySet()) {
int pIdx = p2tuplesMapEntry.getIntKey();
LSMTreeIndexAccessor lsmAccessor = (LSMTreeIndexAccessor) indexAccessors[pIdx];
IFrameOperationCallback frameOpCallback = frameOpCallbacks[pIdx];
IFrameTupleProcessor processor = processors[pIdx];
lsmAccessor.batchOperate(accessor, tuple, processor, frameOpCallback, p2tuplesMapEntry.getValue());
}
writeBuffer.ensureFrameSize(buffer.capacity());
if (flushedPartialTuples) {
flushPartialFrame();
} else {
FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
}
currentTupleIdx = 0;
lastFlushedTupleIdx = 0;
flushedPartialTuples = false;
}
/**
* flushes tuples in a frame from lastFlushedTupleIdx(inclusive) to currentTupleIdx(exclusive)
*/
@Override
public void flushPartialFrame() throws HyracksDataException {
if (lastFlushedTupleIdx == currentTupleIdx) {
//nothing to flush
return;
}
for (int i = lastFlushedTupleIdx; i < currentTupleIdx; i++) {
FrameUtils.appendToWriter(writer, appender, accessor, i);
}
appender.write(writer, true);
lastFlushedTupleIdx = currentTupleIdx;
flushedPartialTuples = true;
}
@Override
public void close() throws HyracksDataException {
Throwable failure = CleanupUtils.destroy(null, cursors);
failure = CleanupUtils.close(writer, failure);
failure = CleanupUtils.close(indexHelpers, failure);
failure = CleanupUtils.close(keyIndexHelpers, failure);
if (failure != null) {
throw HyracksDataException.create(failure);
}
}
@Override
public void fail() throws HyracksDataException {
writer.fail();
}
@Override
public void flush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
}