blob: 62979b5088a4a670035c1aaf6952d6f86ff15b75 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.api;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.tez.client.TezClient;
import org.apache.tez.runtime.api.Processor;
/**
* A {@link PreWarmVertex} is used to specify parameters to be used to setup
* prewarmed containers for Tez session mode. Sessions allow re-use of execution
* slots (containers) across DAG's. Pre- warming allows pre-allocation of
* containers so that the first DAG has some execution resources already
* available to re-use. In order to get re-use containers they must be setup
* identically. So the prewarm vertex must be setup identically to the real DAG
* vertex (typically the first vertex to execute in the read DAG). Identical
* settings include same execution resources, same task local files etc. This
* works best in use cases where all DAGs share the same files/jars/resource
* settings from a common template<br>
* The parallelism of the pre-warm vertex determines the number of containers to
* be pre-warmed. This would ideally ensures a viable number of containers to
* provide performance while sharing resources with other applications.
* Typically the session would also hold onto the same number of containers
* in-between DAGs in session mode via the
* {@link TezConfiguration#TEZ_AM_SESSION_MIN_HELD_CONTAINERS} property. The
* prewarm vertex by default runs the PreWarmProcessor from the Tez runtime
* library. This processor can be overridden to get the default behavior along
* with any app specific customizations. Alternatively, the application can
* provide any {@link Processor} to prewarm the containers. Pre-warming
* processors can be used to initialize classes etc. and setup the environment
* for the actual processing to reduce latency.
*/
@Unstable
@Public
public class PreWarmVertex extends Vertex {
private PreWarmVertex(String vertexName, ProcessorDescriptor processorDescriptor, int parallelism,
Resource taskResource) {
super(vertexName, processorDescriptor, parallelism, taskResource);
}
private PreWarmVertex(String vertexName, int parallelism, Resource taskResource) {
this(vertexName, ProcessorDescriptor.create(
"org.apache.tez.runtime.library.processor.PreWarmProcessor"), parallelism, taskResource);
}
/**
* Create a config builder for the @link {@link PreWarmVertex}. This may be used to construct the
* pre-warm vertex more flexibly.
* @param conf
* @return a new config builder for {@link PreWarmVertex}
*/
public static PreWarmVertexConfigBuilder createConfigBuilder(Configuration conf) {
return new PreWarmVertexConfigBuilder(conf);
}
/**
* Create a {@link PreWarmVertex} to be used in
* {@link TezClient#preWarm(PreWarmVertex)} It may be necessary to call more
* methods to add local files etc on the pre-warm vertex post creation so that
* it matches the real DAG vertices.
*
* @param vertexName
* Name of the vertex
* @param processorDescriptor
* Descriptor of the processor to be run
* @param parallelism
* Number of containers to be pre-warmed
* @param taskResource
* Execution cpu/memory resources etc needed
*/
public static PreWarmVertex create(String vertexName, ProcessorDescriptor processorDescriptor,
int parallelism,
Resource taskResource) {
return new PreWarmVertex(vertexName, processorDescriptor, parallelism, taskResource);
}
/**
* Create a {@link PreWarmVertex} to be used in @link
* {@link TezClient#preWarm(PreWarmVertex)} This uses a built in pre-warm
* processor that implements common functionality. Users may derive from this
* processor to add custom functionality but then they must add the jar for
* that class to the prewarm vertex and other vertices in their DAG for which
* they want the containers to be reused. It may be necessary to call more
* methods to add local files etc on the pre-warm vertex post creation so that
* it matches the real DAG vertices.
*
* @param vertexName
* Name of the vertex
* @param parallelism
* Number of containers to be pre-warmed
* @param taskResource
* Execution cpu/memory resources etc needed
*/
public static PreWarmVertex create(String vertexName, int parallelism, Resource taskResource) {
return new PreWarmVertex(vertexName, parallelism, taskResource);
}
/**
* Setup the prewarm vertex constructor. By default is uses the built-in
* PreWarmProcessor and sets up the prewarm container number equal to
* {@link TezConfiguration#TEZ_AM_SESSION_MIN_HELD_CONTAINERS}
*/
public static class PreWarmVertexConfigBuilder {
String name;
int parallelism;
ProcessorDescriptor proc;
Resource resource;
Configuration conf;
PreWarmVertexConfigBuilder(Configuration conf) {
this.conf = conf;
}
public PreWarmVertexConfigBuilder setName(String name) {
this.name = name;
return this;
}
public PreWarmVertexConfigBuilder setProcessorDescriptor(ProcessorDescriptor proc) {
this.proc = proc;
return this;
}
public PreWarmVertexConfigBuilder setResource(Resource resource) {
this.resource = resource;
return this;
}
public PreWarmVertexConfigBuilder setParallelism(int parallelism) {
this.parallelism = parallelism;
return this;
}
public PreWarmVertex build() {
if (name == null) {
name = "_PreWarm_";
}
if (parallelism == 0) {
parallelism = conf.getInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, -1);
if (parallelism == -1) {
throw new TezUncheckedException("Prewarm parallelism must be set or specified in conf via "
+ TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS);
}
}
if (proc == null) {
proc =
ProcessorDescriptor.create("org.apache.tez.runtime.library.processor.PreWarmProcessor");
}
return create(name, proc, parallelism, resource);
}
}
}