blob: 4c728cec22509ee0ff0e650f8cc2db947190c714 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.hyracks.dataflow.std.base;
import java.nio.ByteBuffer;
import org.apache.hyracks.api.comm.IFrameWriter;
import org.apache.hyracks.api.comm.VSizeFrame;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.ActivityId;
import org.apache.hyracks.api.dataflow.IActivityGraphBuilder;
import org.apache.hyracks.api.dataflow.IOperatorNodePushable;
import org.apache.hyracks.api.dataflow.TaskId;
import org.apache.hyracks.api.dataflow.value.IRecordDescriptorProvider;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.job.IOperatorDescriptorRegistry;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import org.apache.hyracks.dataflow.std.misc.MaterializerTaskState;
* Abstract class for two replication related operator descriptor - replicate and split
* Replicate operator propagates all frames to all output branches.
* That is, each tuple will be propagated to all output branches.
* Split operator propagates each tuple in a frame to one output branch only.
public abstract class AbstractReplicateOperatorDescriptor extends AbstractOperatorDescriptor {
protected static final long serialVersionUID = 1L;
protected final static int SPLITTER_MATERIALIZER_ACTIVITY_ID = 0;
protected final static int MATERIALIZE_READER_ACTIVITY_ID = 1;
protected final boolean[] outputMaterializationFlags;
protected final boolean requiresMaterialization;
protected final int numberOfNonMaterializedOutputs;
protected final int numberOfMaterializedOutputs;
public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
int outputArity) {
this(spec, rDesc, outputArity, new boolean[outputArity]);
public AbstractReplicateOperatorDescriptor(IOperatorDescriptorRegistry spec, RecordDescriptor rDesc,
int outputArity, boolean[] outputMaterializationFlags) {
super(spec, 1, outputArity);
for (int i = 0; i < outputArity; i++) {
outRecDescs[i] = rDesc;
this.outputMaterializationFlags = outputMaterializationFlags;
boolean reqMaterialization = false;
int matOutputs = 0;
int nonMatOutputs = 0;
for (boolean flag : outputMaterializationFlags) {
if (flag) {
reqMaterialization = true;
} else {
this.requiresMaterialization = reqMaterialization;
this.numberOfMaterializedOutputs = matOutputs;
this.numberOfNonMaterializedOutputs = nonMatOutputs;
public void contributeActivities(IActivityGraphBuilder builder) {
ReplicatorMaterializerActivityNode sma =
new ReplicatorMaterializerActivityNode(new ActivityId(odId, SPLITTER_MATERIALIZER_ACTIVITY_ID));
builder.addActivity(this, sma);
builder.addSourceEdge(0, sma, 0);
int pipelineOutputIndex = 0;
for (int i = 0; i < outputArity; i++) {
if (outputMaterializationFlags[i]) {
MaterializeReaderActivityNode mra =
new MaterializeReaderActivityNode(new ActivityId(odId, activityId++));
builder.addActivity(this, mra);
builder.addBlockingEdge(sma, mra);
builder.addTargetEdge(i, mra, 0);
} else {
builder.addTargetEdge(i, sma, pipelineOutputIndex++);
protected class ReplicatorMaterializerActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
public ReplicatorMaterializerActivityNode(ActivityId id) {
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
return new AbstractUnaryInputOperatorNodePushable() {
private MaterializerTaskState state;
private final IFrameWriter[] writers = new IFrameWriter[numberOfNonMaterializedOutputs];
private final boolean[] isOpen = new boolean[numberOfNonMaterializedOutputs];
public void open() throws HyracksDataException {
if (requiresMaterialization) {
state = new MaterializerTaskState(ctx.getJobletContext().getJobId(),
new TaskId(getActivityId(), partition), numberOfMaterializedOutputs);;
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
isOpen[i] = true;
public void nextFrame(ByteBuffer bufferAccessor) throws HyracksDataException {
if (requiresMaterialization) {
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
FrameUtils.flushFrame(bufferAccessor, writers[i]);
public void flush() throws HyracksDataException {
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
public void close() throws HyracksDataException {
HyracksDataException hde = null;
try {
if (requiresMaterialization) {
} finally {
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
if (isOpen[i]) {
try {
} catch (Throwable th) {
if (hde == null) {
hde = HyracksDataException.create(th);
} else {
if (hde != null) {
throw hde;
public void fail() throws HyracksDataException {
// TODO: shouldn't we fail the MaterializerTaskState state?
HyracksDataException hde = null;
for (int i = 0; i < numberOfNonMaterializedOutputs; i++) {
if (isOpen[i]) {
try {
} catch (Throwable th) {
if (hde == null) {
hde = HyracksDataException.create(th);
} else {
if (hde != null) {
throw hde;
public void setOutputFrameWriter(int index, IFrameWriter writer, RecordDescriptor recordDesc) {
writers[index] = writer;
protected class MaterializeReaderActivityNode extends AbstractActivityNode {
private static final long serialVersionUID = 1L;
public MaterializeReaderActivityNode(ActivityId id) {
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
final IRecordDescriptorProvider recordDescProvider, final int partition, int nPartitions)
throws HyracksDataException {
return new AbstractUnaryOutputSourceOperatorNodePushable() {
public void initialize() throws HyracksDataException {
MaterializerTaskState state = (MaterializerTaskState) ctx.getStateObject(
new TaskId(new ActivityId(getOperatorId(), SPLITTER_MATERIALIZER_ACTIVITY_ID), partition));
state.writeOut(writer, new VSizeFrame(ctx), false);