blob: db43c91f7a0fb927bc2427dda736a13f8e27422b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.api;
import javax.annotation.Nullable;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import com.google.common.collect.Sets;
/**
* Defines the input and input initializer for a data source
*
*/
@Public
public class DataSourceDescriptor {
private final InputDescriptor inputDescriptor;
private final InputInitializerDescriptor initializerDescriptor;
private final Credentials credentials;
private final int numShards;
private final VertexLocationHint locationHint;
private final Map<String, LocalResource> additionalLocalFiles;
private final Collection<URI> urisForCredentials = Sets.newHashSet();
private DataSourceDescriptor(InputDescriptor inputDescriptor,
@Nullable InputInitializerDescriptor initializerDescriptor,
@Nullable Credentials credentials) {
this(inputDescriptor, initializerDescriptor, -1, credentials, null, null);
}
private DataSourceDescriptor(InputDescriptor inputDescriptor,
@Nullable InputInitializerDescriptor initializerDescriptor,
int numShards,
@Nullable Credentials credentials,
@Nullable VertexLocationHint locationHint,
@Nullable Map<String, LocalResource> additionalLocalFiles) {
this.inputDescriptor = inputDescriptor;
this.initializerDescriptor = initializerDescriptor;
this.numShards = numShards;
this.credentials = credentials;
this.locationHint = locationHint;
this.additionalLocalFiles = additionalLocalFiles;
}
/**
* Create a {@link DataSourceDescriptor} when the data shard calculation
* happens in the App Master at runtime
* @param inputDescriptor
* An {@link InputDescriptor} for the Input
* @param credentials Credentials needed to access the data
* @param initializerDescriptor
* An initializer for this Input which may run within the AM. This
* can be used to set the parallelism for this vertex and generate
* {@link InputDataInformationEvent}s for the actual Input.</p>
* If this is not specified, the parallelism must be set for the
* vertex. In addition, the Input should know how to access data for
* each of it's tasks. </p> If a {@link InputInitializer} is
* meant to determine the parallelism of the vertex, the initial
* vertex parallelism should be set to -1. Can be null.
*/
public static DataSourceDescriptor create(InputDescriptor inputDescriptor,
@Nullable InputInitializerDescriptor initializerDescriptor,
@Nullable Credentials credentials) {
return new DataSourceDescriptor(inputDescriptor, initializerDescriptor, credentials);
}
/**
* Create a {@link DataSourceDescriptor} when the data shard calculation
* happens in the client at compile time
*
* @param inputDescriptor An {@link InputDescriptor} for the Input
* @param initializerDescriptor An initializer for this Input which may run within the AM.
* This can be used to set the parallelism for this vertex and
* generate {@link org.apache.tez.runtime.api.events.InputDataInformationEvent}s
* for the actual Input.</p>
* If this is not specified, the parallelism must be set for the
* vertex. In addition, the Input should know how to access data
* for each of it's tasks. </p> If a {@link org.apache.tez.runtime.api.InputInitializer}
* is
* meant to determine the parallelism of the vertex, the initial
* vertex parallelism should be set to -1. Can be null.
* @param numShards Number of shards of data
* @param credentials Credentials needed to access the data
* @param locationHint Location hints for the vertex tasks
* @param additionalLocalFiles additional local files required by this Input. An attempt
* will be made to add these files to the Vertex as Private
* resources. If a name conflict occurs, a {@link
* org.apache.tez.dag.api.TezUncheckedException} will be thrown
*/
public static DataSourceDescriptor create(InputDescriptor inputDescriptor,
@Nullable InputInitializerDescriptor initializerDescriptor,
int numShards,
@Nullable Credentials credentials,
@Nullable VertexLocationHint locationHint,
@Nullable Map<String, LocalResource> additionalLocalFiles) {
return new DataSourceDescriptor(inputDescriptor, initializerDescriptor, numShards, credentials,
locationHint, additionalLocalFiles);
}
/**
* Get the {@link InputDescriptor} for this {@link DataSourceDescriptor}
* @return {@link InputDescriptor}
*/
public InputDescriptor getInputDescriptor() {
return inputDescriptor;
}
/**
* Get the {@link InputInitializerDescriptor} for this {@link DataSourceDescriptor}
* @return {@link InputInitializerDescriptor}
*/
public @Nullable InputInitializerDescriptor getInputInitializerDescriptor() {
return initializerDescriptor;
}
/**
* This method can be used to specify a list of URIs for which Credentials
* need to be obtained so that the job can run. An incremental list of URIs
* can be provided by making multiple calls to the method.
*
* Currently, @{link credentials} can only be fetched for HDFS and other
* {@link org.apache.hadoop.fs.FileSystem} implementations that support
* credentials.
*
* @param uris
* a list of {@link URI}s
* @return this
*/
public synchronized DataSourceDescriptor addURIsForCredentials(Collection<URI> uris) {
Objects.requireNonNull(uris, "URIs cannot be null");
urisForCredentials.addAll(uris);
return this;
}
/**
* Get the URIs for which credentials will be obtained
* @return an unmodifiable list representing the URIs for which credentials
* are required.
*/
public Collection<URI> getURIsForCredentials() {
return Collections.unmodifiableCollection(urisForCredentials);
}
/**
* Number of shards for this data source. If a vertex has only one
* data source this the number of tasks in the vertex should be set to
* the number of shards
* Returns -1 when this is determined at runtime in the AM.
* @return number of tasks
*/
@InterfaceAudience.Private
public int getNumberOfShards() {
return numShards;
}
/**
* Returns any credentials needed to access this data source.
* Is null when this calculation happens on the AppMaster (default)
* @return credentials.
*/
@InterfaceAudience.Private
public @Nullable Credentials getCredentials() {
return credentials;
}
/**
* Get the location hints for the tasks in the vertex for this data source.
* Is null when shard calculation happens on the AppMaster (default)
* @return List of {@link TaskLocationHint}
*/
@InterfaceAudience.Private
public @Nullable VertexLocationHint getLocationHint() {
return locationHint;
}
/**
* Get the list of additional local files which were specified during creation.
* @return Map of additional local files or null if there are none
*/
@InterfaceAudience.Private
public @Nullable Map<String, LocalResource> getAdditionalLocalFiles() {
return additionalLocalFiles;
}
}