blob: 3858b12df4ddce8caf39ea1a5ec26d7bec689be5 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using NSubstitute;
using Org.Apache.REEF.Client.Yarn.RestClient;
using Org.Apache.REEF.Client.YARN.RestClient;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Util;
using Org.Apache.REEF.Utilities.AsyncUtils;
using Xunit;
namespace Org.Apache.REEF.Client.Tests
{
public class YarnClientTests
{
[Fact]
public async Task TestGetClusterInfo()
{
// arrange
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyClusterInfo = new ClusterInfo
{
HadoopBuildVersion = "AnyBuildVersion",
HadoopVersion = "AnyVersion",
HadoopVersionBuiltOn = "AnyVersionBuildOn",
};
restReqExecutor.ExecuteAsync<ClusterInfo>(
Arg.Is<RestRequest>(
req =>
req.Resource == "ws/v1/cluster/info" && req.RootElement == "clusterInfo" &&
req.Method == Method.GET),
anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyClusterInfo));
// act
var yarnClient = ctx.GetClient();
ClusterInfo actualClusterInfo = await yarnClient.GetClusterInfoAsync();
// assert
Assert.Equal(anyClusterInfo, actualClusterInfo);
var unused = urlProvider.Received(1).GetUrlAsync();
}
[Fact]
public async Task TestGetClusterMetrics()
{
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyClusterMetrics = new ClusterMetrics
{
ActiveNodes = 5,
AllocatedMB = 1000,
AllocatedVirtualCores = 10,
AppsCompleted = 301
};
restReqExecutor.ExecuteAsync<ClusterMetrics>(
Arg.Is<RestRequest>(
req =>
req.Resource == "ws/v1/cluster/metrics" && req.RootElement == "clusterMetrics" &&
req.Method == Method.GET),
anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyClusterMetrics));
var yarnClient = ctx.GetClient();
ClusterMetrics actualClusterMetrics = await yarnClient.GetClusterMetricsAsync();
Assert.Equal(anyClusterMetrics, actualClusterMetrics);
var unused = urlProvider.Received(1).GetUrlAsync();
}
[Fact]
public async Task TestGetApplication()
{
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
const string applicationId = "AnyApplicationId";
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyApplication = new Application
{
AllocatedMB = 100,
AmHostHttpAddress = "http://anyhttpaddress",
AmContainerLogs = "SomeLogs",
ApplicationType = "AnyYarnApplicationType",
State = State.FINISHED,
Name = "AnyApplicationName",
RunningContainers = 0
};
restReqExecutor.ExecuteAsync<Application>(
Arg.Is<RestRequest>(
req =>
req.Resource == "ws/v1/cluster/apps/" + applicationId
&& req.RootElement == "app"
&& req.Method == Method.GET),
anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyApplication));
var yarnClient = ctx.GetClient();
Application actualApplication = await yarnClient.GetApplicationAsync(applicationId);
Assert.Equal(anyApplication, actualApplication);
var unused = urlProvider.Received(1).GetUrlAsync();
}
[Fact]
public async Task TestGetApplicationFinalStatus()
{
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
const string applicationId = "AnyApplicationId";
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyApplication = new Application
{
AllocatedMB = 100,
AmHostHttpAddress = "http://anyhttpaddress",
AmContainerLogs = "SomeLogs",
ApplicationType = "AnyYarnApplicationType",
State = State.FINISHED,
FinalStatus = FinalState.SUCCEEDED,
Name = "AnyApplicationName",
RunningContainers = 0
};
restReqExecutor.ExecuteAsync<Application>(
Arg.Is<RestRequest>(
req =>
req.Resource == "ws/v1/cluster/apps/" + applicationId
&& req.RootElement == "app"
&& req.Method == Method.GET),
anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyApplication));
var yarnClient = ctx.GetClient();
Application actualApplication = await yarnClient.GetApplicationAsync(applicationId);
Assert.Equal(actualApplication.FinalStatus, FinalState.SUCCEEDED);
}
[Fact]
public async Task TestCreateNewApplication()
{
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
const string applicationId = "AnyApplicationId";
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anyNewApplication = new NewApplication
{
ApplicationId = applicationId
};
restReqExecutor.ExecuteAsync<NewApplication>(
Arg.Is<RestRequest>(
req =>
req.Resource == "ws/v1/cluster/apps/new-application"
&& req.Method == Method.POST),
anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(anyNewApplication));
var yarnClient = ctx.GetClient();
NewApplication actualNewApplication = await yarnClient.CreateNewApplicationAsync();
Assert.Equal(anyNewApplication, actualNewApplication);
var unused = urlProvider.Received(1).GetUrlAsync();
}
[Fact]
public async Task TestSubmitNewApplication()
{
var ctx = new TestContext();
var urlProvider = ctx.UrlProviderFake;
var restReqExecutor = ctx.RestRequestExecutorFake;
var anyUri = Enumerable.Repeat(new Uri("anyscheme://anypath"), 1);
const string applicationId = "AnyApplicationId";
const string anyApplicationType = "REEFTest";
const string anyApplicationName = "AnyAPP";
urlProvider.GetUrlAsync().Returns(Task.FromResult(anyUri));
var anySubmitApplication = new SubmitApplication
{
ApplicationId = applicationId,
AmResource = new Resouce
{
MemoryMB = 500,
VCores = 1
},
ApplicationType = anyApplicationType,
ApplicationName = anyApplicationName,
KeepContainersAcrossApplicationAttempts = false,
MaxAppAttempts = 1,
Priority = 1,
UnmanagedAM = false,
AmContainerSpec = new AmContainerSpec
{
Commands = new Commands
{
Command = @"DONTCARE"
},
LocalResources = new LocalResources
{
Entries = new List<YARN.RestClient.DataModel.KeyValuePair<string, LocalResourcesValue>>
{
new YARN.RestClient.DataModel.KeyValuePair<string, LocalResourcesValue>
{
Key = "APPLICATIONWILLFAILBUTWEDONTCAREHERE",
Value = new LocalResourcesValue
{
Resource = "Foo",
Type = ResourceType.FILE,
Visibility = Visibility.APPLICATION
}
}
}
}
}
};
const string expectedJson = @"{" +
@"""application-id"":""AnyApplicationId""," +
@"""application-name"":""AnyAPP""," +
@"""queue"":null,""priority"":1," +
@"""am-container-spec"":" +
@"{" +
@"""local-resources"":" +
@"{" +
@"""entry"":" +
@"[" +
@"{" +
@"""key"":""APPLICATIONWILLFAILBUTWEDONTCAREHERE""," +
@"""value"":" +
@"{" +
@"""resource"":""Foo""," +
@"""type"":""FILE""," +
@"""visibility"":""APPLICATION""," +
@"""size"":0," +
@"""timestamp"":0" +
@"}" +
@"}" +
@"]" +
@"}," +
@"""environment"":null," +
@"""commands"":" +
@"{" +
@"""command"":""DONTCARE""" +
@"}," +
@"""service-data"":null," +
@"""credentials"":null," +
@"""application-acls"":null}," +
@"""unmanaged-AM"":false," +
@"""max-app-attempts"":1," +
@"""resource"":" +
@"{" +
@"""memory"":500," +
@"""vCores"":1" +
@"},""application-type"":""REEFTest""," +
@"""keep-containers-across-application-attempts"":false," +
@"""application-tags"":null" +
@"}";
var thisApplication = new Application
{
AllocatedMB = 100,
AmHostHttpAddress = "http://anyhttpaddress",
AmContainerLogs = "SomeLogs",
ApplicationType = "AnyYarnApplicationType",
State = State.FINISHED,
Name = "AnyApplicationName",
RunningContainers = 0
};
var response = new RestResponse<VoidResult>
{
StatusCode = HttpStatusCode.Accepted
};
restReqExecutor.ExecuteAsync(
Arg.Is<RestRequest>(
req =>
req.Resource == "ws/v1/cluster/apps"
&& req.Method == Method.POST
&& req.Content.Headers.ContentType.MediaType == "application/json"
&& IsExpectedJson(req, expectedJson)),
anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(response));
restReqExecutor.ExecuteAsync<Application>(
Arg.Is<RestRequest>(
req =>
req.Resource == "ws/v1/cluster/apps/" + applicationId
&& req.RootElement == "app"
&& req.Method == Method.GET),
anyUri.First(),
CancellationToken.None).Returns(Task.FromResult(thisApplication));
var yarnClient = ctx.GetClient();
Application actualApplication = await yarnClient.SubmitApplicationAsync(anySubmitApplication);
Assert.Equal(thisApplication, actualApplication);
var unused = urlProvider.Received(2).GetUrlAsync();
}
private static bool IsExpectedJson(RestRequest req, string expectedJson)
{
return req.Content.ReadAsStringAsync().Result == expectedJson;
}
private class TestContext
{
public readonly IUrlProvider UrlProviderFake = Substitute.For<IUrlProvider>();
public readonly IRestRequestExecutor RestRequestExecutorFake = Substitute.For<IRestRequestExecutor>();
public IYarnRMClient GetClient()
{
var injector = TangFactory.GetTang().NewInjector();
injector.BindVolatileInstance(GenericType<IUrlProvider>.Class, UrlProviderFake);
injector.BindVolatileInstance(GenericType<IRestRequestExecutor>.Class, RestRequestExecutorFake);
return injector.GetInstance<IYarnRMClient>();
}
}
}
}