blob: 7f74bd196a5bb574500f59f105afa627a2bb7272 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.stram.plan.physical;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.Operator.InputPort;
import com.datatorrent.api.Partitioner.PartitionKeys;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.stram.Journal.Recoverable;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.WindowGenerator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlan.InputPortMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.OperatorMeta;
import com.datatorrent.stram.plan.logical.LogicalPlan.StreamMeta;
/**
*
* Representation of an operator in the physical layout<p>
* <br>
*
* @since 0.3.5
*/
public class PTOperator implements java.io.Serializable
{
private static final long serialVersionUID = 201312112033L;
public static final Recoverable SET_OPERATOR_STATE = new SetOperatorState();
public enum State
{
PENDING_DEPLOY,
ACTIVE,
PENDING_UNDEPLOY,
INACTIVE
}
/**
*
* Representation of an input in the physical layout. A source in the DAG
* <p>
* <br>
*/
public static class PTInput implements java.io.Serializable
{
private static final long serialVersionUID = 201312112033L;
public final LogicalPlan.StreamMeta logicalStream;
public final PTOperator target;
public final PartitionKeys partitions;
public final PTOutput source;
public final String portName;
public final boolean delay;
/**
*
* @param portName
* @param logicalStream
* @param target
* @param partitions
* @param source
*/
protected PTInput(String portName, StreamMeta logicalStream, PTOperator target, PartitionKeys partitions, PTOutput source, boolean delay)
{
this.logicalStream = logicalStream;
this.target = target;
this.partitions = partitions;
this.source = source;
this.portName = portName;
this.source.sinks.add(this);
this.delay = delay;
}
/**
*
* @return String
*/
@Override
public String toString()
{
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("o", this.target).append("port", this.portName).append("source", this.source).toString();
}
}
/**
*
* Representation of an output in the physical layout. A sink in the DAG
* <p>
* <br>
*/
public static class PTOutput implements java.io.Serializable
{
private static final long serialVersionUID = 201312112033L;
public final LogicalPlan.StreamMeta logicalStream;
public final PTOperator source;
public final String portName;
public final List<PTInput> sinks;
/**
* Constructor
*
* @param portName
* @param logicalStream
* @param source
*/
protected PTOutput(String portName, StreamMeta logicalStream, PTOperator source)
{
this.logicalStream = logicalStream;
this.source = source;
this.portName = portName;
this.sinks = new ArrayList<>();
}
/**
* Determine whether downstream operators are deployed inline. (all
* instances of the downstream operator are in the same container)
*
* @return boolean
*/
public boolean isDownStreamInline()
{
for (PTInput sink : this.sinks) {
if (this.source.container != sink.target.container) {
return false;
}
}
return true;
}
public Set<PTOperator> threadLocalSinks()
{
Set<PTOperator> threadLocalOperators = null;
if (logicalStream != null && logicalStream.getLocality() == Locality.THREAD_LOCAL) {
threadLocalOperators = new HashSet<>();
for (PTInput sink : this.sinks) {
threadLocalOperators.add(sink.target);
}
}
return threadLocalOperators;
}
/**
* @return String
*/
@Override
public String toString()
{
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("o", this.source)
.append("port", this.portName)
.append("stream", this.logicalStream.getName())
.toString();
}
}
private static class SetOperatorState implements Recoverable
{
private final int operatorId;
private final PTOperator.State state;
private SetOperatorState()
{
operatorId = -1;
state = PTOperator.State.INACTIVE;
}
private SetOperatorState(int operatorId, PTOperator.State state)
{
this.operatorId = operatorId;
this.state = state;
}
@Override
public void read(final Object object, final Input in) throws KryoException
{
PhysicalPlan plan = (PhysicalPlan)object;
int operatorId = in.readInt();
int stateOrd = in.readInt();
plan.getAllOperators().get(operatorId).state = PTOperator.State.values()[stateOrd];
}
@Override
public void write(final Output out) throws KryoException
{
out.writeInt(operatorId);
out.writeInt(state.ordinal());
}
}
PTOperator(PhysicalPlan plan, int id, String name, OperatorMeta om)
{
this.checkpoints = new LinkedList<>();
this.plan = plan;
this.name = name;
this.id = id;
this.operatorMeta = om;
this.stats = new OperatorStatus(this.id, om);
}
private volatile PTOperator.State state = State.INACTIVE;
private final PhysicalPlan plan;
PTContainer container;
final LogicalPlan.OperatorMeta operatorMeta;
final int id;
private final String name;
Map<InputPortMeta, PartitionKeys> partitionKeys;
OperatorMeta unifiedOperatorMeta; /* this is the meta for the operator for which this unifier exists */
List<PTInput> inputs;
List<PTOutput> outputs;
public final LinkedList<Checkpoint> checkpoints;
Checkpoint recoveryCheckpoint;
public int failureCount = 0;
public int loadIndicator = 0;
public List<? extends StatsListener.StatsListenerWithContext> statsListeners;
public final OperatorStatus stats;
final Map<Locality, HostOperatorSet> groupings = Maps.newHashMapWithExpectedSize(3);
public List<StreamingContainerUmbilicalProtocol.StramToNodeRequest> deployRequests = Collections.emptyList();
public final HashMap<InputPortMeta, PTOperator> upstreamMerge = new HashMap<>();
/**
* @return Operator
*/
public OperatorMeta getOperatorMeta()
{
return this.operatorMeta;
}
public PTOperator.State getState()
{
return state;
}
public void setState(PTOperator.State state)
{
this.getPlan().getContext().writeJournal(new SetOperatorState(getId(), state));
this.state = state;
}
/**
* Return the most recent checkpoint for this operator,
* representing the last getSaveStream reported.
*
* @return long
*/
public Checkpoint getRecentCheckpoint()
{
if (checkpoints.isEmpty()) {
return Checkpoint.INITIAL_CHECKPOINT;
}
return checkpoints.getLast();
}
/**
* Return the checkpoint that can be used for recovery. This may not be the
* most recent checkpoint, depending on downstream state.
*
* @return long
*/
public Checkpoint getRecoveryCheckpoint()
{
return recoveryCheckpoint;
}
public void setRecoveryCheckpoint(Checkpoint recoveryCheckpoint)
{
this.recoveryCheckpoint = recoveryCheckpoint;
}
/**
*
* @return String
*/
public String getLogicalId()
{
return operatorMeta.getName();
}
public int getId()
{
return id;
}
public String getName()
{
return name;
}
public PhysicalPlan getPlan()
{
return plan;
}
public List<PTInput> getInputs()
{
return inputs;
}
public List<PTOutput> getOutputs()
{
return outputs;
}
public PTContainer getContainer()
{
return container;
}
public Map<InputPort<?>, PartitionKeys> getPartitionKeys()
{
Map<InputPort<?>, PartitionKeys> pkeys = null;
if (partitionKeys != null) {
pkeys = Maps.newHashMapWithExpectedSize(partitionKeys.size());
for (Map.Entry<InputPortMeta, PartitionKeys> e : partitionKeys.entrySet()) {
pkeys.put(e.getKey().getPort(), e.getValue());
}
}
return pkeys;
}
public int getBufferServerMemory()
{
int bufferServerMemory = 0;
for (int i = 0; i < outputs.size(); i++) {
if (!outputs.get(i).isDownStreamInline()) {
bufferServerMemory += outputs.get(i).logicalStream.getSource().getValue(Context.PortContext.BUFFER_MEMORY_MB);
}
}
return bufferServerMemory;
}
public Set<PTOperator> getThreadLocalOperators()
{
Set<PTOperator> threadLocalOperators = null;
for (int i = 0; i < outputs.size(); i++) {
if (outputs.get(i).logicalStream != null && outputs.get(i).logicalStream.getLocality() == Locality.THREAD_LOCAL) {
if (threadLocalOperators == null) {
threadLocalOperators = new HashSet<>();
}
threadLocalOperators.addAll(outputs.get(i).threadLocalSinks());
}
}
return threadLocalOperators;
}
public void setPartitionKeys(Map<InputPort<?>, PartitionKeys> portKeys)
{
if (portKeys == null) {
this.partitionKeys = Collections.emptyMap();
return;
}
HashMap<LogicalPlan.InputPortMeta, PartitionKeys> partitionKeys = Maps.newHashMapWithExpectedSize(portKeys.size());
for (Map.Entry<InputPort<?>, PartitionKeys> portEntry : portKeys.entrySet()) {
LogicalPlan.InputPortMeta pportMeta = operatorMeta.getMeta(portEntry.getKey());
if (pportMeta == null) {
throw new AssertionError("Invalid port reference " + portEntry);
}
partitionKeys.put(pportMeta, portEntry.getValue());
}
this.partitionKeys = partitionKeys;
}
public boolean isUnifier()
{
return unifiedOperatorMeta != null;
}
public OperatorMeta getUnifiedOperatorMeta()
{
return unifiedOperatorMeta;
}
public Class<?> getUnifierClass()
{
if (unifiedOperatorMeta == null) {
throw new IllegalStateException("OperatorMeta does not support this method, please call isUnifier before making this call");
}
return operatorMeta.getOperator().getClass();
}
public boolean isOperatorStateLess()
{
if (operatorMeta.getDAG().getValue(OperatorContext.STATELESS) || operatorMeta.getValue(OperatorContext.STATELESS)) {
return true;
}
return operatorMeta.getOperator().getClass().isAnnotationPresent(Stateless.class);
}
public Checkpoint addCheckpoint(long windowId, long startTime)
{
int widthMillis = operatorMeta.getDAG().getValue(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS);
long millis = WindowGenerator.getNextWindowMillis(windowId, startTime, widthMillis);
long count = WindowGenerator.getWindowCount(millis, startTime, widthMillis);
Checkpoint c = new Checkpoint(windowId, (int)(count % operatorMeta.getValue(OperatorContext.APPLICATION_WINDOW_COUNT)), (int)(count % operatorMeta.getValue(OperatorContext.CHECKPOINT_WINDOW_COUNT)));
this.checkpoints.add(c);
return c;
}
HostOperatorSet getGrouping(Locality type)
{
HostOperatorSet grpObj = this.groupings.get(type);
if (grpObj == null) {
grpObj = new HostOperatorSet();
grpObj.operatorSet = Sets.newHashSet();
this.groupings.put(type, grpObj);
}
return grpObj;
}
public HostOperatorSet getNodeLocalOperators()
{
return getGrouping(Locality.NODE_LOCAL);
}
public class HostOperatorSet implements java.io.Serializable
{
private static final long serialVersionUID = 201312112033L;
private String host;
private Set<PTOperator> operatorSet;
public String getHost()
{
return host;
}
public void setHost(String host)
{
this.host = host;
}
public Set<PTOperator> getOperatorSet()
{
return operatorSet;
}
public void setOperatorSet(Set<PTOperator> operatorSet)
{
this.operatorSet = operatorSet;
}
}
/**
*
* @return String
*/
@Override
public String toString()
{
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE).append("id", id).append("name", name).append("state", state).toString();
}
}