blob: caf5acd588f05bcf65992adf2d9a7db15d1f2709 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.library.vertexmanager;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.ConcurrentEdgeTriggerType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.tez.dag.api.EdgeProperty.SchedulingType.CONCURRENT;
public class VertexManagerWithConcurrentInput extends VertexManagerPlugin {
private static final Logger LOG = LoggerFactory.getLogger(VertexManagerWithConcurrentInput.class);
private final Map<String, Boolean> srcVerticesConfigured = Maps.newConcurrentMap();
private int managedTasks;
private AtomicBoolean tasksScheduled = new AtomicBoolean(false);
private AtomicBoolean onVertexStartedDone = new AtomicBoolean(false);
private Configuration vertexConfig;
private String vertexName;
private ConcurrentEdgeTriggerType edgeTriggerType;
private volatile boolean allSrcVerticesConfigured;
int completedUpstreamTasks;
public VertexManagerWithConcurrentInput(VertexManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
UserPayload userPayload = getContext().getUserPayload();
if (userPayload == null || userPayload.getPayload() == null ||
userPayload.getPayload().limit() == 0) {
throw new TezUncheckedException("Could not initialize VertexManagerWithConcurrentInput"
+ " from provided user payload");
}
managedTasks = getContext().getVertexNumTasks(getContext().getVertexName());
Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
for (Map.Entry<String, EdgeProperty> entry : edges.entrySet()) {
if (!CONCURRENT.equals(entry.getValue().getSchedulingType())) {
throw new TezUncheckedException("All input edges to vertex " + vertexName +
" must be CONCURRENT.");
}
String srcVertex = entry.getKey();
srcVerticesConfigured.put(srcVertex, false);
getContext().registerForVertexStateUpdates(srcVertex, EnumSet.of(VertexState.CONFIGURED));
}
try {
vertexConfig = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
} catch (IOException e) {
throw new TezUncheckedException(e);
}
edgeTriggerType = ConcurrentEdgeTriggerType.valueOf(
vertexConfig.get(TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE,
TezConfiguration.TEZ_CONCURRENT_EDGE_TRIGGER_TYPE_DEFAULT));
if (!ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED.equals(edgeTriggerType)) {
// pending TEZ-3999
throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
}
LOG.info("VertexManagerWithConcurrentInput initialized with edgeTriggerType {}.", edgeTriggerType);
vertexName = getContext().getVertexName();
completedUpstreamTasks = 0;
}
@Override
public synchronized void onVertexStarted(List<TaskAttemptIdentifier> completions) {
onVertexStartedDone.set(true);
scheduleTasks();
}
@Override
public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
VertexState state = stateUpdate.getVertexState();
String fromVertex = stateUpdate.getVertexName();
if (!srcVerticesConfigured.containsKey(fromVertex)) {
throw new IllegalArgumentException("Not expecting state update from vertex:" +
fromVertex + " in vertex: " + this.vertexName);
}
if (!VertexState.CONFIGURED.equals(state)) {
throw new IllegalArgumentException("Received incorrect state notification : " +
state + " from vertex: " + fromVertex + " in vertex: " + this.vertexName);
}
LOG.info("Received configured notification: " + state + " for vertex: "
+ fromVertex + " in vertex: " + this.vertexName);
srcVerticesConfigured.put(fromVertex, true);
// check for source vertices completely configured
boolean checkAllSrcVerticesConfigured = true;
for (Map.Entry<String, Boolean> entry : srcVerticesConfigured.entrySet()) {
if (!entry.getValue()) {
// vertex not configured
LOG.info("Waiting for vertex {} in vertex {} ", entry.getKey(), this.vertexName);
checkAllSrcVerticesConfigured = false;
break;
}
}
allSrcVerticesConfigured = checkAllSrcVerticesConfigured;
scheduleTasks();
}
@Override
public synchronized void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
completedUpstreamTasks ++;
LOG.info("Source task attempt {} completion received at vertex {}", attempt, this.vertexName);
}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
}
@Override
public void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
}
private void scheduleTasks() {
if (!onVertexStartedDone.get()) {
// vertex not started yet
return;
}
if (tasksScheduled.get()) {
// already scheduled
return;
}
if (!canScheduleTasks()) {
return;
}
tasksScheduled.compareAndSet(false, true);
List<VertexManagerPluginContext.ScheduleTaskRequest> tasksToStart = Lists.newArrayListWithCapacity(managedTasks);
for (int i = 0; i < managedTasks; ++i) {
tasksToStart.add(VertexManagerPluginContext.ScheduleTaskRequest.create(i, null));
}
if (!tasksToStart.isEmpty()) {
LOG.info("Starting {} tasks in {}.", tasksToStart.size(), this.vertexName);
getContext().scheduleTasks(tasksToStart);
}
// all tasks scheduled. Can call vertexManagerDone().
}
private boolean canScheduleTasks() {
if (edgeTriggerType.equals(ConcurrentEdgeTriggerType.SOURCE_VERTEX_CONFIGURED)) {
return allSrcVerticesConfigured;
} else {
// pending TEZ-3999
throw new TezUncheckedException("Only support SOURCE_VERTEX_CONFIGURED triggering type for now.");
}
}
/**
* Create a {@link VertexManagerPluginDescriptor} builder that can be used to
* configure the plugin.
*
* @param conf
* {@link Configuration} May be modified in place. May be null if the
* configuration parameters are to be set only via code. If
* configuration values may be changed at runtime via a config file
* then pass in a {@link Configuration} that is initialized from a
* config file. The parameters that are not overridden in code will
* be derived from the Configuration object.
* @return {@link ConcurrentInputVertexManagerConfigBuilder}
*/
public static ConcurrentInputVertexManagerConfigBuilder createConfigBuilder(
@Nullable Configuration conf) {
return new ConcurrentInputVertexManagerConfigBuilder(conf);
}
/**
* Helper class to configure VertexManagerWithConcurrentInput
*/
public static final class ConcurrentInputVertexManagerConfigBuilder {
private final Configuration conf;
private ConcurrentInputVertexManagerConfigBuilder(@Nullable Configuration conf) {
if (conf == null) {
this.conf = new Configuration(false);
} else {
this.conf = conf;
}
}
public VertexManagerPluginDescriptor build() {
VertexManagerPluginDescriptor desc =
VertexManagerPluginDescriptor.create(
VertexManagerWithConcurrentInput.class.getName());
try {
return desc.setUserPayload(TezUtils
.createUserPayloadFromConf(this.conf));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
}
}
}