[REEF-1820] Specify node names and relaxLocality flag in Evaluator request
Addresses the following:
* Specify node names and relaxLocality flag in Evaluator request in .Net
* Specify node names and relaxLocality flag in Evaluator request in Java
* Modify Bridge to patch changes from .Net to Java
* Extends Hello Reef for yarn as sample/e2e test for the current changes
JIRA:
[REEF-1820](https://issues.apache.org/jira/browse/REEF-1820)
Pull request:
This closes #1360
diff --git a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
index 7616c2c..90fe5d4 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/EvaluatorRequestorClr2Java.cpp
@@ -53,7 +53,7 @@
ManagedLog::LOGGER->LogStart("EvaluatorRequestorClr2Java::Submit");
JNIEnv *env = RetrieveEnv(_jvm);
jclass jclassEvaluatorRequestor = env->GetObjectClass(_jobjectEvaluatorRequestor);
- jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIILjava/lang/String;Ljava/lang/String;)V");
+ jmethodID jmidSubmit = env->GetMethodID(jclassEvaluatorRequestor, "submit", "(IIIZLjava/lang/String;Ljava/lang/String;Ljava/util/ArrayList;)V");
if (jmidSubmit == NULL) {
fprintf(stdout, " jmidSubmit is NULL\n");
@@ -66,8 +66,10 @@
request->Number,
request->MemoryMegaBytes,
request->VirtualCore,
+ request->RelaxLocality,
JavaStringFromManagedString(env, request->Rack),
- JavaStringFromManagedString(env, request->RuntimeName));
+ JavaStringFromManagedString(env, request->RuntimeName),
+ JavaArrayListFromManagedList(env, request->NodeNames));
ManagedLog::LOGGER->LogStop("EvaluatorRequestorClr2Java::Submit");
}
diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
index 628e7ba..4402c44 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
+++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.cpp
@@ -70,6 +70,21 @@
return env->NewString((const jchar*)wch, managedString->Length);
}
+jobject JavaArrayListFromManagedList(
+ JNIEnv *env,
+ System::Collections::Generic::ICollection<String^>^ managedNodeNames) {
+
+ jclass arrayListClazz = (*env).FindClass("java/util/ArrayList");
+ jobject arrayListObj = (*env).NewObject(arrayListClazz, (*env).GetMethodID(arrayListClazz, "<init>", "()V"));
+
+ for each (String^ nodeName in managedNodeNames)
+ {
+ jstring nodeNamestr = JavaStringFromManagedString(env, nodeName);
+ (*env).CallBooleanMethod(arrayListObj, (*env).GetMethodID(arrayListClazz, "add", "(Ljava/lang/Object;)Z"), nodeNamestr);
+ }
+ return arrayListObj;
+}
+
void HandleClr2JavaError(
JNIEnv *env,
String^ errorMessage,
diff --git a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
index c06377c..bbaf369 100644
--- a/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
+++ b/lang/cs/Org.Apache.REEF.Bridge/InteropUtil.h
@@ -48,6 +48,10 @@
JNIEnv *env,
String^ managedString);
+jobject JavaArrayListFromManagedList(
+ JNIEnv *env,
+ System::Collections::Generic::ICollection<String^>^ managedNodeNames);
+
array<byte>^ ManagedByteArrayFromJavaByteArray(
JNIEnv *env,
jbyteArray javaByteArray);
diff --git a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs
index 31cb36f..504ae17 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Bridge/Events/EvaluatorRequestor.cs
@@ -18,6 +18,7 @@
using System;
using System.Collections.Generic;
using System.Globalization;
+using System.Linq;
using System.Runtime.Serialization;
using Org.Apache.REEF.Common.Catalog;
using Org.Apache.REEF.Common.Evaluator;
@@ -64,7 +65,8 @@
public void Submit(IEvaluatorRequest request)
{
- LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} and runtime {4}.", request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName);
+ LOGGER.Log(Level.Info, "Submitting request for {0} evaluators and {1} MB memory and {2} core to rack {3} runtime {4}, nodeNames to schedule {5} and RelaxLocality is {6}.",
+ request.Number, request.MemoryMegaBytes, request.VirtualCore, request.Rack, request.RuntimeName, string.Join(",", request.NodeNames.ToArray()), request.RelaxLocality);
lock (Evaluators)
{
for (var i = 0; i < request.Number; i++)
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
index 685da50..2c8365d 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequest.cs
@@ -16,6 +16,8 @@
// under the License.
using System;
+using System.Collections.Generic;
+using System.Linq;
using System.Runtime.Serialization;
namespace Org.Apache.REEF.Driver.Evaluator
@@ -27,37 +29,43 @@
internal class EvaluatorRequest : IEvaluatorRequest
{
internal EvaluatorRequest()
- : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty)
+ : this(0, 0, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
{
}
internal EvaluatorRequest(int number, int megaBytes)
- : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty)
+ : this(number, megaBytes, 1, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
{
}
internal EvaluatorRequest(int number, int megaBytes, int core)
- : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty)
+ : this(number, megaBytes, core, string.Empty, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
{
}
internal EvaluatorRequest(int number, int megaBytes, string rack)
- : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty)
+ : this(number, megaBytes, 1, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
{
}
internal EvaluatorRequest(int number, int megaBytes, int core, string rack)
- : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty)
+ : this(number, megaBytes, core, rack, Guid.NewGuid().ToString("N"), string.Empty, Enumerable.Empty<string>().ToList(), true)
{
}
- internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId)
- : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty)
+ internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames)
+ : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, true)
{
}
- internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName)
+ internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, ICollection<string> nodeNames, bool relaxLocality)
+ : this(number, megaBytes, core, rack, evaluatorBatchId, string.Empty, nodeNames, relaxLocality)
+
+ {
+ }
+
+ internal EvaluatorRequest(int number, int megaBytes, int core, string rack, string evaluatorBatchId, string runtimeName, ICollection<string> nodeNames, bool relaxLocality)
{
Number = number;
MemoryMegaBytes = megaBytes;
@@ -65,6 +73,8 @@
Rack = rack;
EvaluatorBatchId = evaluatorBatchId;
RuntimeName = runtimeName;
+ NodeNames = nodeNames;
+ RelaxLocality = relaxLocality;
}
[DataMember]
@@ -85,6 +95,12 @@
[DataMember]
public string RuntimeName { get; private set; }
+ [DataMember]
+ public ICollection<string> NodeNames { get; private set; }
+
+ [DataMember]
+ public bool RelaxLocality { get; private set; }
+
internal static EvaluatorRequestBuilder NewBuilder()
{
return new EvaluatorRequestBuilder();
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
index d6e5a63..6ccd1fb 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/EvaluatorRequestBuilder.cs
@@ -16,6 +16,8 @@
// under the License.
using System;
+using System.Collections.Generic;
+using System.Linq;
using Org.Apache.REEF.Common.Runtime;
namespace Org.Apache.REEF.Driver.Evaluator
@@ -25,6 +27,8 @@
private string _evaluatorBatchId;
private string _rackName;
private string _runtimeName;
+ private ICollection<string> _nodeNames;
+ private bool _relaxLocality;
internal EvaluatorRequestBuilder(IEvaluatorRequest request)
{
@@ -34,6 +38,8 @@
_evaluatorBatchId = request.EvaluatorBatchId;
_rackName = request.Rack;
_runtimeName = request.RuntimeName;
+ _nodeNames = request.NodeNames;
+ _relaxLocality = request.RelaxLocality;
}
internal EvaluatorRequestBuilder()
@@ -44,6 +50,8 @@
_rackName = string.Empty;
_evaluatorBatchId = Guid.NewGuid().ToString("N");
_runtimeName = string.Empty;
+ _nodeNames = Enumerable.Empty<string>().ToList();
+ _relaxLocality = true;
}
public int Number { get; private set; }
@@ -95,7 +103,29 @@
}
/// <summary>
- /// Sets the batch ID for requested evaluators in the same request. The batch of Evaluators requested in the
+ /// Set desired node names for the Evaluator to be allocated on.
+ /// </summary>
+ /// <param name="nodeNames"></param>
+ /// <returns>this</returns>
+ public EvaluatorRequestBuilder AddNodeNames(ICollection<string> nodeNames)
+ {
+ _nodeNames = nodeNames;
+ return this;
+ }
+
+ /// <summary>
+ /// Set a desired node name for evaluator to be allocated
+ /// </summary>
+ /// <param name="nodeName"></param>
+ /// <returns></returns>
+ public EvaluatorRequestBuilder AddNodeName(string nodeName)
+ {
+ _nodeNames.Add(nodeName);
+ return this;
+ }
+
+ /// <summary>
+ /// Sets the batch ID for requested evaluators in the same request. The batch of Evaluators requested in the
/// same request will have the same Evaluator Batch ID.
/// </summary>
/// <param name="evaluatorBatchId">The batch ID for the Evaluator request.</param>
@@ -117,12 +147,23 @@
}
/// <summary>
+ /// Set the relax locality for requesting evaluator with specified node names
+ /// </summary>
+ /// <param name="relaxLocality">Locality relax flag.</param>
+ /// <returns>this</returns>
+ public EvaluatorRequestBuilder SetRelaxLocality(bool relaxLocality)
+ {
+ _relaxLocality = relaxLocality;
+ return this;
+ }
+
+ /// <summary>
/// Build the EvaluatorRequest.
/// </summary>
/// <returns></returns>
public IEvaluatorRequest Build()
{
- return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName);
+ return new EvaluatorRequest(Number, MegaBytes, VirtualCore, rack: _rackName, evaluatorBatchId: _evaluatorBatchId, runtimeName: _runtimeName, nodeNames: _nodeNames, relaxLocality: _relaxLocality);
}
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
index 6b2ce21..357ffe3 100644
--- a/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
+++ b/lang/cs/Org.Apache.REEF.Driver/Evaluator/IEvaluatorRequest.cs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+using System.Collections.Generic;
+
namespace Org.Apache.REEF.Driver.Evaluator
{
/// <summary>
@@ -43,14 +45,33 @@
string Rack { get; }
/// <summary>
+ /// The desired node names for the Evaluator to be allocated on.
+ /// </summary>
+ ICollection<string> NodeNames { get; }
+
+ /// <summary>
/// The batch ID for requested evaluators. Evaluators requested in the same batch
/// will have the same Batch ID.
/// </summary>
string EvaluatorBatchId { get; }
/// <summary>
- /// The name of the runtime to allocate teh evaluator on
+ /// The name of the runtime to allocate the evaluator on
/// </summary>
string RuntimeName { get; }
+
+ /// <summary>
+ /// For a request at a network hierarchy level, set whether locality can be relaxed to that level and beyond.
+ /// If the flag is off on a rack-level ResourceRequest, containers at that request's priority
+ /// will not be assigned to nodes on that request's rack unless requests specifically for
+ /// those nodes have also been submitted.
+ /// If the flag is off on an ANY-level ResourceRequest, containers at that request's priority
+ /// will only be assigned on racks for which specific requests have also been submitted.
+ /// For example, to request a container strictly on a specific node, the corresponding rack-level
+ /// and any-level requests should have locality relaxation set to false. Similarly,
+ /// to request a container strictly on a specific rack,
+ /// the corresponding any-level request should have locality relaxation set to false.
+ /// </summary>
+ bool RelaxLocality { get; }
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs
new file mode 100644
index 0000000..9c84d33
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloDriverYarn.cs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Org.Apache.REEF.Common.Tasks;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Driver.Evaluator;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.HelloREEF
+{
+ /// <summary>
+ /// The Driver for HelloREEF: It requests a single Evaluator and then submits the HelloTask to it.
+ /// </summary>
+ public sealed class HelloDriverYarn : IObserver<IAllocatedEvaluator>, IObserver<IDriverStarted>
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(HelloDriver));
+ private readonly IEvaluatorRequestor _evaluatorRequestor;
+
+ /// <summary>
+ /// List of node names for desired evaluators
+ /// </summary>
+ private readonly IList<string> _nodeNames;
+
+ /// <summary>
+ /// Specify if the desired node names is relaxed
+ /// </summary>
+ private readonly bool _relaxLocality;
+
+ /// <summary>
+ /// Constructor of the driver
+ /// </summary>
+ /// <param name="evaluatorRequestor">Evaluator Requestor</param>
+ /// <param name="nodeNames">Node names for evaluators</param>
+ /// <param name="relaxLocality">Relax indicator of evaluator node request</param>
+ [Inject]
+ private HelloDriverYarn(IEvaluatorRequestor evaluatorRequestor,
+ [Parameter(typeof(NodeNames))] ISet<string> nodeNames,
+ [Parameter(typeof(RelaxLocality))] bool relaxLocality)
+ {
+ _evaluatorRequestor = evaluatorRequestor;
+ _nodeNames = nodeNames.ToList();
+ _relaxLocality = relaxLocality;
+ }
+
+ /// <summary>
+ /// Submits the HelloTask to the Evaluator.
+ /// </summary>
+ /// <param name="allocatedEvaluator"></param>
+ public void OnNext(IAllocatedEvaluator allocatedEvaluator)
+ {
+ Logger.Log(Level.Info, "Received allocatedEvaluator-HostName: " + allocatedEvaluator.GetEvaluatorDescriptor().NodeDescriptor.HostName);
+ var taskConfiguration = TaskConfiguration.ConfigurationModule
+ .Set(TaskConfiguration.Identifier, "HelloTask")
+ .Set(TaskConfiguration.Task, GenericType<HelloTask>.Class)
+ .Build();
+ allocatedEvaluator.SubmitTask(taskConfiguration);
+ }
+
+ public void OnError(Exception error)
+ {
+ throw error;
+ }
+
+ public void OnCompleted()
+ {
+ }
+
+ /// <summary>
+ /// Called to start the user mode driver
+ /// </summary>
+ /// <param name="driverStarted"></param>
+ public void OnNext(IDriverStarted driverStarted)
+ {
+ Logger.Log(Level.Info, string.Format("HelloDriver started at {0}", driverStarted.StartTime));
+
+ if (_nodeNames != null && _nodeNames.Count > 0)
+ {
+ _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder()
+ .AddNodeNames(_nodeNames)
+ .SetMegabytes(64)
+ .SetNumber(_nodeNames.Count)
+ .SetRelaxLocality(_relaxLocality)
+ .Build());
+ }
+ else
+ {
+ _evaluatorRequestor.Submit(_evaluatorRequestor.NewBuilder()
+ .SetMegabytes(64)
+ .Build());
+ }
+ }
+ }
+
+ [NamedParameter(documentation: "Set of node names for evaluators")]
+ internal class NodeNames : Name<ISet<string>>
+ {
+ }
+
+ [NamedParameter(documentation: "RelaxLocality for specifying evaluator node names", shortName: "RelaxLocality", defaultValue: "true")]
+ internal class RelaxLocality : Name<bool>
+ {
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 575a13f..20c80fa 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -103,7 +103,7 @@
}
}
- public static void Main(string[] args)
+ public static void MainSimple(string[] args)
{
TangFactory.GetTang().NewInjector(GetRuntimeConfiguration(args.Length > 0 ? args[0] : Local)).GetInstance<HelloREEF>().Run();
}
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
new file mode 100644
index 0000000..fbccdec
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEFYarn.cs
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using Org.Apache.REEF.Client.API;
+using Org.Apache.REEF.Client.Common;
+using Org.Apache.REEF.Client.Yarn;
+using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
+using Org.Apache.REEF.Driver;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Configuration;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Tang.Interface;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.Examples.HelloREEF
+{
+ /// <summary>
+ /// A Tool that submits HelloREEFDriver for execution on YARN.
+ /// </summary>
+ public sealed class HelloREEFYarn
+ {
+ private const int ReTryCounts = 200;
+ private const int SleepTime = 2000;
+ private const string DefaultPortRangeStart = "2000";
+ private const string DefaultPortRangeCount = "20";
+ private const string TrustedApplicationTokenIdentifier = "TrustedApplicationTokenIdentifier";
+ private const string SecurityTokenId = "SecurityTokenId";
+ private const string SecurityTokenPwd = "SecurityTokenPwd";
+
+ private readonly IREEFClient _reefClient;
+ private readonly JobRequestBuilder _jobRequestBuilder;
+
+ private static readonly Logger Logger = Logger.GetLogger(typeof(HelloREEFYarn));
+
+ /// <summary>
+ /// List of node names for evaluators
+ /// </summary>
+ private readonly IList<string> _nodeNames;
+
+ [Inject]
+ private HelloREEFYarn(IREEFClient reefClient,
+ JobRequestBuilder jobRequestBuilder,
+ [Parameter(typeof(NodeNames))] ISet<string> nodeNames)
+ {
+ _reefClient = reefClient;
+ _jobRequestBuilder = jobRequestBuilder;
+ _nodeNames = nodeNames.ToList();
+ }
+
+ /// <summary>
+ /// Runs HelloREEF using the IREEFClient passed into the constructor.
+ /// </summary>
+ private void Run()
+ {
+ // The driver configuration contains all the needed handler bindings
+ var helloDriverConfiguration = DriverConfiguration.ConfigurationModule
+ .Set(DriverConfiguration.OnEvaluatorAllocated, GenericType<HelloDriverYarn>.Class)
+ .Set(DriverConfiguration.OnDriverStarted, GenericType<HelloDriverYarn>.Class)
+ .Build();
+
+ var driverConfig = TangFactory.GetTang()
+ .NewConfigurationBuilder(helloDriverConfiguration);
+
+ foreach (var n in _nodeNames)
+ {
+ driverConfig.BindSetEntry<NodeNames, string>(GenericType<NodeNames>.Class, n);
+ }
+
+ // The JobSubmission contains the Driver configuration as well as the files needed on the Driver.
+ var helloJobRequest = _jobRequestBuilder
+ .AddDriverConfiguration(driverConfig.Build())
+ .AddGlobalAssemblyForType(typeof(HelloDriverYarn))
+ .SetJobIdentifier("HelloREEF")
+ .SetJavaLogLevel(JavaLoggingSetting.Verbose)
+ .Build();
+
+ var result = _reefClient.SubmitAndGetJobStatus(helloJobRequest);
+ var state = PullFinalJobStatus(result);
+ Logger.Log(Level.Info, "Application state : {0}.", state);
+ }
+
+ /// <summary>
+ /// This is to pull job final status until the Job is done
+ /// </summary>
+ /// <param name="jobSubmitionResult"></param>
+ /// <returns></returns>
+ private FinalState PullFinalJobStatus(IJobSubmissionResult jobSubmitionResult)
+ {
+ int n = 0;
+ var state = jobSubmitionResult.FinalState;
+ while (state.Equals(FinalState.UNDEFINED) && n++ < ReTryCounts)
+ {
+ Thread.Sleep(SleepTime);
+ state = jobSubmitionResult.FinalState;
+ }
+ return state;
+ }
+
+ /// <summary>
+ /// Get runtime configuration
+ /// </summary>
+ /// <returns></returns>
+ private static IConfiguration GetRuntimeConfiguration(string[] args)
+ {
+ var c = YARNClientConfiguration.ConfigurationModule
+ .Set(YARNClientConfiguration.SecurityTokenKind, TrustedApplicationTokenIdentifier)
+ .Set(YARNClientConfiguration.SecurityTokenService, TrustedApplicationTokenIdentifier)
+ .Build();
+
+ File.WriteAllText(SecurityTokenId, args[0]);
+ File.WriteAllText(SecurityTokenPwd, args[1]);
+
+ IConfiguration tcpPortConfig = TcpPortConfigurationModule.ConfigurationModule
+ .Set(TcpPortConfigurationModule.PortRangeStart, args.Length > 2 ? args[2] : DefaultPortRangeStart)
+ .Set(TcpPortConfigurationModule.PortRangeCount, args.Length > 3 ? args[3] : DefaultPortRangeCount)
+ .Build();
+
+ return Configurations.Merge(c, tcpPortConfig);
+ }
+
+ /// <summary>
+ /// HelloREEF example running on YARN
+ /// Usage: Org.Apache.REEF.Examples.HelloREEF SecurityTokenId SecurityTokenPw [portRangerStart] [portRangeCount] [nodeName1] [nodeName2]...
+ /// </summary>
+ /// <param name="args"></param>
+ public static void MainYarn(string[] args)
+ {
+ var configBuilder = TangFactory.GetTang()
+ .NewConfigurationBuilder(GetRuntimeConfiguration(args));
+
+ if (args.Length > 4)
+ {
+ for (int i = 4; i < args.Length; i++)
+ {
+ configBuilder.BindSetEntry<NodeNames, string>(GenericType<NodeNames>.Class, args[i]);
+ }
+ }
+
+ TangFactory.GetTang().NewInjector(configBuilder.Build()).GetInstance<HelloREEFYarn>().Run();
+ }
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj
index d60e48c..be7fed1 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Org.Apache.REEF.Examples.HelloREEF.csproj
@@ -31,9 +31,12 @@
<Link>Properties\SharedAssemblyInfo.cs</Link>
</Compile>
<Compile Include="HelloDriver.cs" />
+ <Compile Include="HelloDriverYarn.cs" />
<Compile Include="HelloREEF.cs" />
+ <Compile Include="HelloREEFYarn.cs" />
<Compile Include="HelloTask.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="Run.cs" />
</ItemGroup>
<ItemGroup>
<None Include="$(SolutionDir)\App.config">
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs
new file mode 100644
index 0000000..f8aa715
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/Run.cs
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+namespace Org.Apache.REEF.Examples.HelloREEF
+{
+ public sealed class Run
+ {
+ /// <summary>
+ /// Program that runs hello reef
+ /// </summary>
+ /// <param name="args"></param>
+ public static void Main(string[] args)
+ {
+ if (args.Length < 2)
+ {
+ HelloREEF.MainSimple(args);
+ }
+ else
+ {
+ HelloREEFYarn.MainYarn(args);
+ }
+ }
+ }
+}
diff --git a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
index 0db4388..518537d 100644
--- a/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
+++ b/lang/java/reef-bridge-java/src/main/java/org/apache/reef/javabridge/EvaluatorRequestorBridge.java
@@ -66,8 +66,10 @@
public void submit(final int evaluatorsNumber,
final int memory,
final int virtualCore,
+ final boolean relaxLocality,
final String rack,
- final String runtimeName) {
+ final String runtimeName,
+ final ArrayList<String> nodeNames) {
if (this.isBlocked) {
throw new RuntimeException("Cannot request additional Evaluator, this is probably because " +
"the Driver has crashed and restarted, and cannot ask for new container due to YARN-2433.");
@@ -85,6 +87,8 @@
.setMemory(memory)
.setNumberOfCores(virtualCore)
.setRuntimeName(runtimeName)
+ .setRelaxLocality(relaxLocality)
+ .addNodeNames(nodeNames)
.build();
LOG.log(Level.FINE, "submitting evaluator request {0}", request);
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
index 38494ac..dfed8f6 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/driver/evaluator/EvaluatorRequest.java
@@ -40,6 +40,7 @@
private final List<String> nodeNames;
private final List<String> rackNames;
private final String runtimeName;
+ private final boolean relaxLocality;
EvaluatorRequest(final int number,
final int megaBytes,
@@ -55,12 +56,24 @@
final List<String> nodeNames,
final List<String> rackNames,
final String runtimeName) {
+ this(number, megaBytes, cores, nodeNames, rackNames, runtimeName, true);
+ }
+
+
+ EvaluatorRequest(final int number,
+ final int megaBytes,
+ final int cores,
+ final List<String> nodeNames,
+ final List<String> rackNames,
+ final String runtimeName,
+ final boolean relaxLocality) {
this.number = number;
this.megaBytes = megaBytes;
this.cores = cores;
this.nodeNames = nodeNames;
this.rackNames = rackNames;
this.runtimeName = runtimeName;
+ this.relaxLocality = relaxLocality;
}
/**
@@ -137,6 +150,16 @@
}
/**
+ * Access the locality relax flag.
+ *
+ * @return the value of relaxLocality. If not set default is true.
+ */
+ public boolean getRelaxLocality() {
+ return relaxLocality;
+ }
+
+
+ /**
* {@link EvaluatorRequest}s are build using this Builder.
*/
public static class Builder<T extends Builder> implements org.apache.reef.util.Builder<EvaluatorRequest> {
@@ -147,6 +170,7 @@
private final List<String> nodeNames = new ArrayList<>();
private final List<String> rackNames = new ArrayList<>();
private String runtimeName = "";
+ private boolean relaxLocality = true; //if not set, default to true
@Private
public Builder() {
@@ -163,6 +187,7 @@
setMemory(request.getMegaBytes());
setNumberOfCores(request.getNumberOfCores());
setRuntimeName(request.getRuntimeName());
+ setRelaxLocality(request.getRelaxLocality());
for (final String nodeName : request.getNodeNames()) {
addNodeName(nodeName);
}
@@ -233,6 +258,21 @@
}
/**
+ * Adds node names.They are the preferred locations where the evaluator should
+ * run on. If any of the node is available, the RM will try to allocate the
+ * evaluator there
+ *
+ * @param nodeNamesList preferred node names
+ * @return this Builder.
+ */
+ public T addNodeNames(final List<String> nodeNamesList) {
+ if(nodeNamesList != null) {
+ this.nodeNames.addAll(nodeNamesList);
+ }
+ return (T) this;
+ }
+
+ /**
* Adds a rack name. It is the preferred location where the evaluator should
* run on. If the rack is available, the RM will try to allocate the
* evaluator in one of its nodes. The RM will try to match node names first,
@@ -247,11 +287,25 @@
}
/**
+ * A boolean relaxLocality flag defaulting to true, which tells the ResourceManager
+ * if the application wants locality to be loose (i.e. allows fall-through to rack or any)
+ * or strict (i.e. specify hard constraint on resource allocation).
+ *
+ * @param relaxLocalityFlg locality relaxation is enabled with this ResourceRequest
+ * @return this Builder.
+ */
+ public T setRelaxLocality(final boolean relaxLocalityFlg) {
+ this.relaxLocality = relaxLocalityFlg;
+ return (T) this;
+ }
+
+ /**
* Builds the {@link EvaluatorRequest}.
*/
@Override
public EvaluatorRequest build() {
- return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames, this.rackNames, this.runtimeName);
+ return new EvaluatorRequest(this.n, this.megaBytes, this.cores, this.nodeNames,
+ this.rackNames, this.runtimeName, this.relaxLocality);
}
}
}
diff --git a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
index b2faf98..992659f 100644
--- a/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
+++ b/lang/java/reef-common/src/main/java/org/apache/reef/runtime/common/driver/EvaluatorRequestorImpl.java
@@ -29,6 +29,7 @@
import org.apache.reef.util.logging.LoggingScopeFactory;
import javax.inject.Inject;
+import java.util.Arrays;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -60,8 +61,11 @@
@Override
public synchronized void submit(final EvaluatorRequest req) {
- LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.",
- new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()});
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.log(Level.FINEST, "Got an EvaluatorRequest: number: {0}, memory = {1}, cores = {2}.",
+ new Object[] {req.getNumber(), req.getMegaBytes(), req.getNumberOfCores()});
+ LOG.log(Level.FINEST, "Node names: " + Arrays.toString(req.getNodeNames().toArray()));
+ }
if (req.getMegaBytes() <= 0) {
throw new IllegalArgumentException("Given an unsupported memory size: " + req.getMegaBytes());
@@ -82,22 +86,18 @@
throw new IllegalArgumentException("Runtime name cannot be null");
}
// for backwards compatibility, we will always set the relax locality flag
- // to true unless the user configured racks, in which case we will check for
- // the ANY modifier (*), if not there, then we won't relax the locality
- boolean relaxLocality = true;
+ // to true unless the user has set it to false in the request, in which case
+ // we will check for the ANY modifier (*), if there, then we relax the
+ // locality regardless of the value set in the request.
+ boolean relaxLocality = req.getRelaxLocality();
if (!req.getRackNames().isEmpty()) {
for (final String rackName : req.getRackNames()) {
if (Constants.ANY_RACK.equals(rackName)) {
relaxLocality = true;
break;
}
- relaxLocality = false;
}
}
- // if the user specified any node, then we assume they do not want to relax locality
- if (!req.getNodeNames().isEmpty()) {
- relaxLocality = false;
- }
try (LoggingScope ls = this.loggingScopeFactory.evaluatorSubmit(req.getNumber())) {
final ResourceRequestEvent request = ResourceRequestEventImpl