blob: 6d5e88b0cb0b7441d3ffd156ea5e19350a06681e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.asterix.om.types.TypeTagUtil;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback;
import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspector;
import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
import org.apache.hyracks.dataflow.common.data.accessors.PermutingFrameTupleReference;
import org.apache.hyracks.dataflow.common.utils.TupleUtils;
import org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.api.ITupleFilter;
import org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
/**
* This operator node is used for secondary indexes with upsert operations.
* It works in the following way:
* For each incoming tuple
* -If old secondary index tuple == new secondary index tuple
* --do nothing
* -else
* --perform the operation based on the operation kind
*/
public class LSMSecondaryUpsertOperatorNodePushable extends LSMIndexInsertUpdateDeleteOperatorNodePushable {
protected static final int UPSERT_NEW = LSMPrimaryUpsertOperatorNodePushable.UPSERT_NEW.getByteValue();
protected static final int UPSERT_EXISTING = LSMPrimaryUpsertOperatorNodePushable.UPSERT_EXISTING.getByteValue();
protected static final int DELETE_EXISTING = LSMPrimaryUpsertOperatorNodePushable.DELETE_EXISTING.getByteValue();
private final PermutingFrameTupleReference prevTuple = new PermutingFrameTupleReference();
private final int numberOfFields;
private final ITupleFilterFactory prevTupleFilterFactory;
private ITupleFilter prevTupleFilter;
protected final int operationFieldIndex;
protected final IBinaryIntegerInspector operationInspector;
public LSMSecondaryUpsertOperatorNodePushable(IHyracksTaskContext ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, IModificationOperationCallbackFactory modCallbackFactory,
ITupleFilterFactory tupleFilterFactory, ITupleFilterFactory prevTupleFilterFactory, int[] fieldPermutation,
RecordDescriptor inputRecDesc, int operationFieldIndex,
IBinaryIntegerInspectorFactory operationInspectorFactory, int[] prevTuplePermutation,
ITuplePartitionerFactory tuplePartitionerFactory, int[][] partitionsMap) throws HyracksDataException {
super(ctx, partition, indexHelperFactory, fieldPermutation, inputRecDesc, IndexOperation.UPSERT,
modCallbackFactory, tupleFilterFactory, tuplePartitionerFactory, partitionsMap);
this.prevTuple.setFieldPermutation(prevTuplePermutation);
this.operationFieldIndex = operationFieldIndex;
this.operationInspector = operationInspectorFactory.createBinaryIntegerInspector(ctx);
this.numberOfFields = fieldPermutation.length;
this.prevTupleFilterFactory = prevTupleFilterFactory;
}
@Override
public void open() throws HyracksDataException {
super.open();
frameTuple = new FrameTupleReference();
if (prevTupleFilterFactory != null) {
prevTupleFilter = prevTupleFilterFactory.createTupleFilter(ctx);
}
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
accessor.reset(buffer);
int tupleCount = accessor.getTupleCount();
boolean tupleFilterIsNull = tupleFilter == null;
boolean prevTupleFilterIsNull = prevTupleFilter == null;
for (int i = 0; i < tupleCount; i++) {
try {
int storagePartition = tuplePartitioner.partition(accessor, i);
int storageIdx = storagePartitionId2Index.get(storagePartition);
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor) indexAccessors[storageIdx];
AbstractIndexModificationOperationCallback abstractModCallback =
(AbstractIndexModificationOperationCallback) modCallbacks[storageIdx];
frameTuple.reset(accessor, i);
int operation = operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
frameTuple.getFieldStart(operationFieldIndex), frameTuple.getFieldLength(operationFieldIndex));
tuple.reset(accessor, i);
prevTuple.reset(accessor, i);
if (operation == UPSERT_NEW) {
if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
abstractModCallback.setOp(Operation.INSERT);
lsmAccessor.forceInsert(tuple);
}
} else if (operation == UPSERT_EXISTING) {
if (!TupleUtils.equalTuples(tuple, prevTuple, numberOfFields)) {
if (prevTupleFilterIsNull || prevTupleFilter.accept(frameTuple)) {
abstractModCallback.setOp(Operation.DELETE);
lsmAccessor.forceDelete(prevTuple);
}
if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
abstractModCallback.setOp(Operation.INSERT);
lsmAccessor.forceInsert(tuple);
}
}
} else if (operation == DELETE_EXISTING) {
if (prevTupleFilterIsNull || prevTupleFilter.accept(frameTuple)) {
abstractModCallback.setOp(Operation.DELETE);
lsmAccessor.forceDelete(prevTuple);
}
}
} catch (Exception e) {
throw HyracksDataException.create(e);
}
}
// No partial flushing was necessary. Forward entire frame.
writeBuffer.ensureFrameSize(buffer.capacity());
FrameUtils.copyAndFlip(buffer, writeBuffer.getBuffer());
FrameUtils.flushFrame(writeBuffer.getBuffer(), writer);
}
private static boolean isNullOrMissing(FrameTupleReference tuple, int fieldIdx) {
return TypeTagUtil.isType(tuple, fieldIdx, ATypeTag.SERIALIZED_NULL_TYPE_TAG)
|| TypeTagUtil.isType(tuple, fieldIdx, ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
}
protected static boolean hasNullOrMissing(FrameTupleReference tuple) {
int fieldCount = tuple.getFieldCount();
for (int i = 0; i < fieldCount; i++) {
if (isNullOrMissing(tuple, i)) {
return true;
}
}
return false;
}
}