blob: 90487e04c4b3f4b9e06750825ae7251c27354c1c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
using System;
using System.Collections.Generic;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.Local;
using Org.Apache.REEF.Client.Yarn;
using Org.Apache.REEF.Tang.Implementations.Tang;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities;
using Org.Apache.REEF.Utilities.Diagnostics;
using Org.Apache.REEF.Utilities.Logging;
using Xunit;
using Timer = System.Timers.Timer;
namespace Org.Apache.REEF.Tests.Functional
{
public class ReefFunctionalTest : IDisposable
{
protected const string DriverStdout = "driver.stdout";
protected const string DriverStderr = "driver.stderr";
protected const string EvaluatorStdout = "evaluator.stdout";
protected const string CmdFile = "run.cmd";
protected const string BinFolder = ".";
protected const string DefaultRuntimeFolder = "REEF_LOCAL_RUNTIME";
private const string Local = "local";
private const string YARN = "yarn";
private const int SleepTime = 1000;
private readonly static Logger Logger = Logger.GetLogger(typeof(ReefFunctionalTest));
private const string StorageAccountKeyEnvironmentVariable = "REEFTestStorageAccountKey";
private const string StorageAccountNameEnvironmentVariable = "REEFTestStorageAccountName";
private bool _testSuccess = false;
private bool _onLocalRuntime = false;
private readonly bool _enableRealtimeLogUpload = false;
protected string TestId { get; set; }
protected Timer TestTimer { get; set; }
protected Task TimerTask { get; set; }
protected bool TestSuccess
{
get { return _testSuccess; }
set { _testSuccess = value; }
}
protected bool IsOnLocalRuntime
{
get { return _onLocalRuntime; }
set { _onLocalRuntime = value; }
}
public ReefFunctionalTest()
{
Init();
}
public void Init()
{
TestId = Guid.NewGuid().ToString("N").Substring(0, 8);
Logger.Log(Level.Info, "Running test " + TestId + ". If failed AND log uploaded is enabled, log can be find in " + Path.Combine(DateTime.Now.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture), TestId));
if (_enableRealtimeLogUpload)
{
TimerTask = new Task(() =>
{
TestTimer = new Timer()
{
Interval = 1000,
Enabled = true,
AutoReset = true
};
TestTimer.Elapsed += PeriodicUploadLog;
TestTimer.Start();
});
TimerTask.Start();
}
ValidationUtilities.ValidateEnvVariable("JAVA_HOME");
if (!Directory.Exists(BinFolder))
{
throw new InvalidOperationException(BinFolder + " not found in current directory, cannot init test");
}
}
protected void CleanUp(string testFolder = DefaultRuntimeFolder)
{
Logger.Log(Level.Verbose, "Cleaning up test.");
if (_enableRealtimeLogUpload)
{
if (TimerTask != null)
{
TestTimer.Stop();
TimerTask.Dispose();
TimerTask = null;
}
// Wait for file upload task to complete
Thread.Sleep(500);
}
string dir = Path.Combine(Directory.GetCurrentDirectory(), testFolder);
try
{
if (Directory.Exists(dir))
{
Directory.Delete(dir, true);
}
}
catch (IOException)
{
// do not fail if clean up is unsuccessful
}
catch (UnauthorizedAccessException)
{
// do not fail if clean up is unsuccessful
}
}
public void Dispose()
{
CleanUp();
}
protected void ValidateSuccessForLocalRuntime(int numberOfContextsToClose, int numberOfTasksToFail = 0, int numberOfEvaluatorsToFail = 0, string testFolder = DefaultRuntimeFolder)
{
const string successIndication = "EXIT: ActiveContextClr2Java::Close";
const string failedTaskIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedTaskHandlerOnNext";
const string failedEvaluatorIndication = "Java_org_apache_reef_javabridge_NativeInterop_clrSystemFailedEvaluatorHandlerOnNext";
string[] lines = ReadLogFile(DriverStdout, "driver", testFolder);
Logger.Log(Level.Verbose, "Lines read from log file : " + lines.Count());
string[] successIndicators = lines.Where(s => s.Contains(successIndication)).ToArray();
string[] failedTaskIndicators = lines.Where(s => s.Contains(failedTaskIndication)).ToArray();
string[] failedEvaluatorIndicators = lines.Where(s => s.Contains(failedEvaluatorIndication)).ToArray();
Assert.True(numberOfContextsToClose == successIndicators.Length,
"Expected number of contexts to close (" + numberOfContextsToClose + ") differs from actual number of success indicators (" + successIndicators.Length + ")");
Assert.True(numberOfTasksToFail == failedTaskIndicators.Length,
"Expected number of tasks to fail (" + numberOfTasksToFail + ") differs from actual number of failed task indicators (" + failedTaskIndicators.Length + ")");
Assert.True(numberOfEvaluatorsToFail == failedEvaluatorIndicators.Length,
"Expected number of evaluators to fail (" + numberOfEvaluatorsToFail + ") differs from actual number of failed evaluator indicators (" + failedEvaluatorIndicators.Length + ")");
}
/// <summary>
/// See <see cref="ValidateMessageSuccessfullyLogged"/> for detail. This function is <see cref="ValidateMessageSuccessfullyLogged"/>
/// for the driver log.
/// </summary>
protected void ValidateMessageSuccessfullyLoggedForDriver(string message, string testFolder, int numberOfOccurrences = 1)
{
var msgs = new List<string> { message };
ValidateMessageSuccessfullyLogged(msgs, "driver", DriverStdout, testFolder, numberOfOccurrences);
}
/// <summary>
/// Validates that each of the message provided in the <see cref="messages"/> parameter occurs
/// some number of times.
/// If <see cref="numberOfOccurrences"/> is greater than or equal to 0, validates that each of the message in
/// <see cref="messages"/> occur <see cref="numberOfOccurrences"/> times.
/// If <see cref="numberOfOccurrences"/> is less than 0, validates that each of the message in <see cref="messages"/>
/// occur at least once.
/// </summary>
protected void ValidateMessageSuccessfullyLogged(IList<string> messages, string subfolder, string fileName, string testFolder, int numberOfOccurrences = 1)
{
string[] lines = ReadLogFile(fileName, subfolder, testFolder);
foreach (string message in messages)
{
string[] successIndicators = lines.Where(s => s.Contains(message)).ToArray();
if (numberOfOccurrences > 0)
{
Assert.True(numberOfOccurrences == successIndicators.Count(),
"Expected number of message \"" + message + "\" occurrences " + numberOfOccurrences + " differs from actual " + successIndicators.Count());
}
else if (numberOfOccurrences == 0)
{
Assert.True(0 == successIndicators.Count(),
"Message \"" + message + "\" not expected to occur but occurs " + successIndicators.Count() + " times");
}
else
{
Assert.True(successIndicators.Count() > 0, "Message \"" + message + "\" expected to occur, but did not.");
}
}
}
protected void PeriodicUploadLog(object source, ElapsedEventArgs e)
{
try
{
UploadDriverLog();
}
catch (Exception)
{
// log not available yet, ignore it
}
}
internal string[] ReadLogFile(string logFileName, string subfolder = "driver", string testFolder = DefaultRuntimeFolder)
{
string fileName = string.Empty;
string[] lines = null;
for (int i = 0; i < 60; i++)
{
try
{
fileName = GetLogFileName(logFileName, subfolder, testFolder);
lines = File.ReadAllLines(fileName);
break;
}
catch (Exception e)
{
if (i == 59)
{
// log only last exception before failure
Logger.Log(Level.Verbose, e.ToString());
}
if (i < 59)
{
Thread.Sleep(SleepTime);
}
}
}
Assert.True(lines != null, "Cannot read from log file " + fileName);
return lines;
}
protected string GetLogFileName(string logFileName, string subfolder = "driver", string testFolder = DefaultRuntimeFolder)
{
string driverContainerDirectory = Directory.GetDirectories(Path.Combine(Directory.GetCurrentDirectory(), testFolder), subfolder, SearchOption.AllDirectories).SingleOrDefault();
Logger.Log(Level.Verbose, "GetLogFileName, driverContainerDirectory:" + driverContainerDirectory);
if (string.IsNullOrWhiteSpace(driverContainerDirectory))
{
throw new InvalidOperationException("Cannot find driver container directory: " + driverContainerDirectory);
}
string logFile = Path.Combine(driverContainerDirectory, logFileName);
if (!File.Exists(logFile))
{
throw new InvalidOperationException("Log file not found: " + logFile);
}
return logFile;
}
private void UploadDriverLog()
{
string driverStdout = GetLogFileName(DriverStdout);
string driverStderr = GetLogFileName(DriverStderr);
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(GetStorageConnectionString());
CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = blobClient.GetContainerReference(DateTime.Now.ToString("yyyy-MM-dd", CultureInfo.InvariantCulture));
container.CreateIfNotExists();
CloudBlockBlob blob = container.GetBlockBlobReference(Path.Combine(TestId, "driverStdOut"));
FileStream fs = new FileStream(driverStdout, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
blob.UploadFromStream(fs);
fs.Close();
blob = container.GetBlockBlobReference(Path.Combine(TestId, "driverStdErr"));
fs = new FileStream(driverStderr, FileMode.Open, FileAccess.Read, FileShare.ReadWrite);
blob.UploadFromStream(fs);
fs.Close();
}
/// <summary>
/// Assembles the storage account connection string from the environment.
/// </summary>
/// <returns>the storage account connection string assembled from the environment.</returns>
/// <exception cref="Exception">If the environment variables aren't set.</exception>
private static string GetStorageConnectionString()
{
var accountName = GetEnvironmentVariable(StorageAccountNameEnvironmentVariable,
"Please set " + StorageAccountNameEnvironmentVariable + " to the storage account name to be used for the tests");
var accountKey = GetEnvironmentVariable(StorageAccountKeyEnvironmentVariable,
"Please set " + StorageAccountKeyEnvironmentVariable + " to the key of the storage account to be used for the tests");
var result = @"DefaultEndpointsProtocol=https;AccountName=" + accountName + ";AccountKey=" + accountKey;
return result;
}
/// <summary>
/// Fetch the value of the given environment variable
/// </summary>
/// <param name="variableName"></param>
/// <param name="errorMessageIfNotAvailable"></param>
/// <returns>the value of the given environment variable</returns>
/// <exception cref="Exception">
/// If the environment variables is not set. The message is taken from
/// errorMessageIfNotAvailable
/// </exception>
private static string GetEnvironmentVariable(string variableName, string errorMessageIfNotAvailable)
{
var result = Environment.GetEnvironmentVariable(variableName);
if (string.IsNullOrWhiteSpace(result))
{
Exceptions.Throw(new Exception(errorMessageIfNotAvailable), Logger);
}
return result;
}
protected void TestRun(IConfiguration driverConfig, Type globalAssemblyType, int numberOfEvaluator, string jobIdentifier = "myDriver", string runOnYarn = "local", string runtimeFolder = DefaultRuntimeFolder)
{
IInjector injector = TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(runOnYarn, numberOfEvaluator, runtimeFolder));
var reefClient = injector.GetInstance<IREEFClient>();
var jobRequestBuilder = injector.GetInstance<JobRequestBuilder>();
var jobSubmission = jobRequestBuilder
.AddDriverConfiguration(driverConfig)
.AddGlobalAssemblyForType(globalAssemblyType)
.SetJobIdentifier(jobIdentifier)
.Build();
reefClient.SubmitAndGetJobStatus(jobSubmission);
}
private IConfiguration GetRuntimeConfiguration(string runOnYarn, int numberOfEvaluator, string runtimeFolder)
{
switch (runOnYarn)
{
case Local:
var dir = Path.Combine(".", runtimeFolder);
return LocalRuntimeClientConfiguration.ConfigurationModule
.Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, numberOfEvaluator.ToString())
.Set(LocalRuntimeClientConfiguration.RuntimeFolder, dir)
.Build();
case YARN:
return YARNClientConfiguration.ConfigurationModule.Build();
default:
throw new Exception("Unknown runtime: " + runOnYarn);
}
}
}
}