blob: a047889a95f5d377ae21db8bce4386d6502f2489 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators.spec;
import java.io.Serializable;
import java.util.Collection;
import java.util.LinkedHashSet;
import org.apache.samza.annotation.InterfaceStability;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.functions.ScheduledFunction;
import org.apache.samza.operators.functions.WatermarkFunction;
/**
* A stream operator specification that holds all the information required to transform
* the input {@link MessageStreamImpl} and produce the output
* {@link MessageStreamImpl}.
*
* @param <M> the type of input message to the operator
* @param <OM> the type of output message from the operator
*/
@InterfaceStability.Unstable
public abstract class OperatorSpec<M, OM> implements Serializable {
public enum OpCode {
INPUT,
MAP,
FLAT_MAP,
FILTER,
SINK,
SEND_TO,
JOIN,
WINDOW,
MERGE,
PARTITION_BY,
OUTPUT,
BROADCAST,
ASYNC_FLAT_MAP
}
private final String opId;
private final OpCode opCode;
private StackTraceElement[] creationStackTrace;
/**
* The set of operators that consume the messages produced from this operator.
* <p>
* We use a LinkedHashSet since we need both deterministic ordering in initializing/closing operators and serializability.
*/
private final LinkedHashSet<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>();
// this method is used in unit tests to verify an {@link OperatorSpec} instance is a deserialized copy of this object.
final boolean isClone(OperatorSpec other) {
return this != other && this.getClass().isAssignableFrom(other.getClass())
&& this.opCode.equals(other.opCode) && this.opId.equals(other.opId);
}
public OperatorSpec(OpCode opCode, String opId) {
this.opCode = opCode;
this.opId = opId;
this.creationStackTrace = Thread.currentThread().getStackTrace();
}
/**
* Register the next operator spec in the chain that this operator should propagate its output to.
* @param nextOperatorSpec the next operator in the chain.
*/
public void registerNextOperatorSpec(OperatorSpec<OM, ?> nextOperatorSpec) {
nextOperatorSpecs.add(nextOperatorSpec);
}
/**
* Get the collection of chained {@link OperatorSpec}s that are consuming the output of this node
*
* @return the collection of chained {@link OperatorSpec}s
*/
public Collection<OperatorSpec<OM, ?>> getRegisteredOperatorSpecs() {
return nextOperatorSpecs;
}
/**
* Get the {@link OpCode} for this operator.
* @return the {@link OpCode} for this operator
*/
public final OpCode getOpCode() {
return this.opCode;
}
/**
* Get the unique ID of this operator in the
* {@link org.apache.samza.application.descriptors.StreamApplicationDescriptorImpl}.
* @return the unique operator ID
*/
public final String getOpId() {
return this.opId;
}
/**
* Get the user source code location that created the operator.
* @return source code location for the operator
*/
public final String getSourceLocation() {
// The stack trace for most operators looks like:
// [0] Thread.getStackTrace()
// [1] OperatorSpec.init<>()
// [2] SomeOperatorSpec.<init>()
// [3] OperatorSpecs.createSomeOperatorSpec()
// [4] MessageStreamImpl.someOperator()
// [5] User/MessageStreamImpl code that calls [4]
// We are interested in the first call below this that originates from user code
StackTraceElement element = this.creationStackTrace[5];
/**
* Sometimes [5] above is a call from MessageStream/MessageStreamImpl itself (e.g. for
* {@link org.apache.samza.operators.MessageStream#mergeAll(Collection)} or
* {@link MessageStreamImpl#partitionBy(Function, Function)}).
* If that's the case, find the first call from a class other than these.
*/
for (int i = 5; i < creationStackTrace.length; i++) {
if (!creationStackTrace[i].getClassName().equals(MessageStreamImpl.class.getName())
&& !creationStackTrace[i].getClassName().equals(MessageStream.class.getName())) {
element = creationStackTrace[i];
break;
}
}
return String.format("%s:%s", element.getFileName(), element.getLineNumber());
}
abstract public WatermarkFunction getWatermarkFn();
abstract public ScheduledFunction getScheduledFn();
}