blob: 0ccb9bab6ba30722d47af59b093a1b29b462a6b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.hadoop.impl.proto;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.QueueAclsInfo;
import org.apache.hadoop.mapreduce.QueueInfo;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.TaskReport;
import org.apache.hadoop.mapreduce.TaskTrackerInfo;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.v2.LogParams;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.client.GridClientException;
import org.apache.ignite.internal.processors.hadoop.HadoopCommonUtils;
import org.apache.ignite.internal.processors.hadoop.HadoopJobId;
import org.apache.ignite.internal.processors.hadoop.HadoopJobProperty;
import org.apache.ignite.internal.processors.hadoop.HadoopJobStatus;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopMapReduceCounters;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils;
import org.apache.ignite.internal.processors.hadoop.mapreduce.MapReduceClient;
import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobCountersTask;
import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolJobStatusTask;
import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolKillJobTask;
import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolNextTaskIdTask;
import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolSubmitJobTask;
import org.apache.ignite.internal.processors.hadoop.proto.HadoopProtocolTaskArguments;
import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.processors.hadoop.impl.HadoopUtils.createJobInfo;
/**
* Hadoop client protocol.
*/
public class HadoopClientProtocol implements ClientProtocol {
/** Protocol version. */
private static final long PROTO_VER = 1L;
/** Default Ignite system directory. */
private static final String SYS_DIR = ".ignite/system";
/** Configuration. */
private final Configuration conf;
/** Ignite client. */
private final MapReduceClient cli;
/** Last received version. */
private long lastVer = -1;
/** Last received status. */
private HadoopJobStatus lastStatus;
/**
* Constructor.
*
* @param conf Configuration.
* @param cli Client.
*/
public HadoopClientProtocol(Configuration conf, MapReduceClient cli) {
assert conf != null;
assert cli != null;
this.conf = conf;
this.cli = cli;
}
/** {@inheritDoc} */
@Override public JobID getNewJobID() throws IOException, InterruptedException {
try {
conf.setLong(HadoopCommonUtils.REQ_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
HadoopJobId jobID = execute(HadoopProtocolNextTaskIdTask.class);
conf.setLong(HadoopCommonUtils.RESPONSE_NEW_JOBID_TS_PROPERTY, U.currentTimeMillis());
return new JobID(jobID.globalId().toString(), jobID.localId());
}
catch (GridClientException e) {
throw new IOException("Failed to get new job ID.", e);
}
}
/** {@inheritDoc} */
@Override public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException,
InterruptedException {
try {
conf.setLong(HadoopCommonUtils.JOB_SUBMISSION_START_TS_PROPERTY, U.currentTimeMillis());
byte[] credentials = null;
if (ts != null)
credentials = WritableUtils.toByteArray(ts);
HadoopJobStatus status = execute(HadoopProtocolSubmitJobTask.class,
jobId.getJtIdentifier(), jobId.getId(), createJobInfo(conf, credentials));
if (status == null)
throw new IOException("Failed to submit job (null status obtained): " + jobId);
return processStatus(status);
}
catch (GridClientException | IgniteCheckedException e) {
throw new IOException("Failed to submit job.", e);
}
}
/** {@inheritDoc} */
@Override public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException {
return new ClusterMetrics(0, 0, 0, 0, 0, 0, 1000, 1000, 1, 100, 0, 0);
}
/** {@inheritDoc} */
@Override public Cluster.JobTrackerStatus getJobTrackerStatus() throws IOException, InterruptedException {
return Cluster.JobTrackerStatus.RUNNING;
}
/** {@inheritDoc} */
@Override public long getTaskTrackerExpiryInterval() throws IOException, InterruptedException {
return 0;
}
/** {@inheritDoc} */
@Override public AccessControlList getQueueAdmins(String queueName) throws IOException {
return new AccessControlList("*");
}
/** {@inheritDoc} */
@Override public void killJob(JobID jobId) throws IOException, InterruptedException {
try {
execute(HadoopProtocolKillJobTask.class, jobId.getJtIdentifier(), jobId.getId());
}
catch (GridClientException e) {
throw new IOException("Failed to kill job: " + jobId, e);
}
}
/** {@inheritDoc} */
@Override public void setJobPriority(JobID jobid, String priority) throws IOException, InterruptedException {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException,
InterruptedException {
return false;
}
/** {@inheritDoc} */
@Override public JobStatus getJobStatus(JobID jobId) throws IOException, InterruptedException {
try {
Long delay = conf.getLong(HadoopJobProperty.JOB_STATUS_POLL_DELAY.propertyName(), -1);
HadoopJobStatus status;
if (delay >= 0)
status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId(), delay);
else
status = execute(HadoopProtocolJobStatusTask.class, jobId.getJtIdentifier(), jobId.getId());
if (status == null)
throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
return processStatus(status);
}
catch (GridClientException e) {
throw new IOException("Failed to get job status: " + jobId, e);
}
}
/** {@inheritDoc} */
@Override public Counters getJobCounters(JobID jobId) throws IOException, InterruptedException {
try {
final HadoopCounters counters = execute(HadoopProtocolJobCountersTask.class,
jobId.getJtIdentifier(), jobId.getId());
if (counters == null)
throw new IOException("Job tracker doesn't have any information about the job: " + jobId);
return new HadoopMapReduceCounters(counters);
}
catch (GridClientException e) {
throw new IOException("Failed to get job counters: " + jobId, e);
}
}
/** {@inheritDoc} */
@Override public TaskReport[] getTaskReports(JobID jobid, TaskType type) throws IOException, InterruptedException {
return new TaskReport[0];
}
/** {@inheritDoc} */
@Override public String getFilesystemName() throws IOException, InterruptedException {
return FileSystem.get(conf).getUri().toString();
}
/** {@inheritDoc} */
@Override public JobStatus[] getAllJobs() throws IOException, InterruptedException {
return new JobStatus[0];
}
/** {@inheritDoc} */
@Override public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId, int maxEvents)
throws IOException, InterruptedException {
return new TaskCompletionEvent[0];
}
/** {@inheritDoc} */
@Override public String[] getTaskDiagnostics(TaskAttemptID taskId) throws IOException, InterruptedException {
return new String[0];
}
/** {@inheritDoc} */
@Override public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException {
return new TaskTrackerInfo[0];
}
/** {@inheritDoc} */
@Override public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, InterruptedException {
return new TaskTrackerInfo[0];
}
/** {@inheritDoc} */
@Override public String getSystemDir() throws IOException, InterruptedException {
Path sysDir = new Path(SYS_DIR);
return sysDir.toString();
}
/** {@inheritDoc} */
@Override public String getStagingAreaDir() throws IOException, InterruptedException {
String usr = UserGroupInformation.getCurrentUser().getShortUserName();
return HadoopUtils.stagingAreaDir(conf, usr).toString();
}
/** {@inheritDoc} */
@Override public String getJobHistoryDir() throws IOException, InterruptedException {
return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf);
}
/** {@inheritDoc} */
@Override public QueueInfo[] getQueues() throws IOException, InterruptedException {
return new QueueInfo[0];
}
/** {@inheritDoc} */
@Override public QueueInfo getQueue(String queueName) throws IOException, InterruptedException {
return null;
}
/** {@inheritDoc} */
@Override public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException {
return new QueueAclsInfo[0];
}
/** {@inheritDoc} */
@Override public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
return new QueueInfo[0];
}
/** {@inheritDoc} */
@Override public QueueInfo[] getChildQueues(String queueName) throws IOException, InterruptedException {
return new QueueInfo[0];
}
/** {@inheritDoc} */
@Override public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException,
InterruptedException {
return null;
}
/** {@inheritDoc} */
@Override public long renewDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
InterruptedException {
return 0;
}
/** {@inheritDoc} */
@Override public void cancelDelegationToken(Token<DelegationTokenIdentifier> token) throws IOException,
InterruptedException {
// No-op.
}
/** {@inheritDoc} */
@Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException,
InterruptedException {
return null;
}
/** {@inheritDoc} */
@Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return PROTO_VER;
}
/** {@inheritDoc} */
@Override public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int clientMethodsHash)
throws IOException {
return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, clientMethodsHash);
}
/**
* Execute task.
*
* @param taskCls Task class.
* @param args Arguments.
* @return Result.
* @throws IOException If failed.
* @throws GridClientException If failed.
*/
private <T> T execute(Class taskCls, Object... args) throws IOException, GridClientException {
HadoopProtocolTaskArguments args0 = args != null ? new HadoopProtocolTaskArguments(args) : null;
return cli.client().compute().execute(taskCls.getName(), args0);
}
/**
* Process received status update.
*
* @param status Ignite status.
* @return Hadoop status.
*/
private JobStatus processStatus(HadoopJobStatus status) {
// IMPORTANT! This method will only work in single-threaded environment. It is valid at the moment because
// IgniteHadoopClientProtocolProvider creates new instance of this class for every new job and Job class
// serializes invocations of submitJob() and getJobStatus() methods. However, if any of these conditions will
// change in future and either protocol will serve statuses for several jobs or status update will not be
// serialized anymore, then we have to fallback to concurrent approach (e.g. using ConcurrentHashMap).
// (vozerov)
if (lastVer < status.version()) {
lastVer = status.version();
lastStatus = status;
}
else
assert lastStatus != null;
return HadoopUtils.status(lastStatus, conf);
}
/**
* Gets the GridClient data.
*
* @return The client data.
*/
public MapReduceClient client() {
return cli;
}
}