[REEF-2041] Enable docker container support for REEF on Azure Batch (#1478)
* Enable Azure Batch containers
* Add container registry to parameters
* Address changes needed for REEF to run with docker containers as well as without the use of docker containers
* TcpPortList is List instead of comma separated string
* Removed isDockerContainer and IS_CONTAINER_BASED_POOL parameters as they are not needed
* Use TcpPortListString to represent list of ports
* Addressed all feedback except consolidating container based configuration into a separate class
* Consolidate docker registry settings into the ContainerRegistryProvider class
* Enable docker container support on Windows for Azure Batch
JIRA: [REEF-2041](https://issues.apache.org/jira/browse/REEF-2041)
Closes #1478
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
index d0c1e14..88f8365 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,16 +22,16 @@
namespace Org.Apache.REEF.Client.Avro.AzureBatch
{
/// <summary>
- /// Used to serialize and deserialize Avro record
+ /// Used to serialize and deserialize Avro record
/// org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters.
- /// This is a (mostly) auto-generated class.
+ /// This is a (mostly) auto-generated class.
/// For instructions on how to regenerate, please view the README.md in the same folder.
/// </summary>
[Private]
[DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
public sealed class AvroAzureBatchJobSubmissionParameters
{
- private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""AzureBatchAccountName"",""type"":""string""},{""name"":""AzureBatchAccountUri"",""type"":""string""},{""name"":""AzureBatchPoolId"",""type"":""string""},{""name"":""AzureStorageAccountName"",""type"":""string""},{""name"":""AzureStorageContainerName"",""type"":""string""},{""name"":""AzureBatchPoolDriverPortsList"",""type"":{""type"": ""array"", ""items"": ""string""}}]}";
+ private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""AzureBatchAccountName"",""type"":""string""},{""name"":""AzureBatchAccountUri"",""type"":""string""},{""name"":""AzureBatchPoolId"",""type"":""string""},{""name"":""AzureStorageAccountName"",""type"":""string""},{""name"":""AzureStorageContainerName"",""type"":""string""},{""name"":""AzureBatchPoolDriverPortsList"",""type"":{""type"": ""array"", ""items"": ""string""}},{""name"":""ContainerRegistryServer"",""type"":""string""},{""name"":""ContainerRegistryUsername"",""type"":""string""},{""name"":""ContainerRegistryPassword"",""type"":""string""},{""name"":""ContainerImageName"",""type"":""string""}]}";
/// <summary>
/// Gets the schema.
@@ -87,6 +87,30 @@
public IList<string> AzureBatchPoolDriverPortsList { get; set; }
/// <summary>
+ /// Gets or sets the ContainerRegistryServer field.
+ /// </summary>
+ [DataMember]
+ public string ContainerRegistryServer { get; set; }
+
+ /// <summary>
+ /// Gets or sets the ContainerRegistryUsername field.
+ /// </summary>
+ [DataMember]
+ public string ContainerRegistryUsername { get; set; }
+
+ /// <summary>
+ /// Gets or sets the ContainerRegistryPassword field.
+ /// </summary>
+ [DataMember]
+ public string ContainerRegistryPassword { get; set; }
+
+ /// <summary>
+ /// Gets or sets the ContainerImageName field.
+ /// </summary>
+ [DataMember]
+ public string ContainerImageName { get; set; }
+
+ /// <summary>
/// Initializes a new instance of the <see cref="AvroAzureBatchJobSubmissionParameters"/> class.
/// </summary>
public AvroAzureBatchJobSubmissionParameters()
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
index d2721a4..8bde80c 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
@@ -18,8 +18,9 @@
using System;
using System.Threading.Tasks;
using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.AzureBatch.Storage;
+using Org.Apache.REEF.Client.API.Parameters;
using Org.Apache.REEF.Client.AzureBatch;
+using Org.Apache.REEF.Client.AzureBatch.Storage;
using Org.Apache.REEF.Client.AzureBatch.Util;
using Org.Apache.REEF.Client.Common;
using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
@@ -27,8 +28,6 @@
using Org.Apache.REEF.Tang.Annotations;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Client.AzureBatch.Parameters;
-using Org.Apache.REEF.Client.API.Parameters;
namespace Org.Apache.REEF.Client.DotNet.AzureBatch
{
@@ -62,7 +61,7 @@
AzureBatchService batchService,
JobJarMaker jobJarMaker,
//// Those parameters are used in AzureBatchJobSubmissionResult, but could not be injected there.
- //// It introduces circular injection issues, as all classes constructor inherited from JobSubmissionResult has reference to IREEFClient.
+ //// It introduces circular injection issues, as all classes constructor inherited from JobSubmissionResult has reference to IREEFClient.
//// TODO: [REEF-2020] Refactor IJobSubmissionResult Interface and JobSubmissionResult implementation
[Parameter(typeof(DriverHTTPConnectionRetryInterval))]int retryInterval,
[Parameter(typeof(DriverHTTPConnectionAttempts))] int numberOfRetries)
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
index 195e2ce..2c55b5b 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
@@ -15,6 +15,9 @@
// specific language governing permissions and limitations
// under the License.
+using System;
+using System.Collections.Generic;
+using System.IO;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.API.Parameters;
using Org.Apache.REEF.Client.AzureBatch.Parameters;
@@ -22,9 +25,6 @@
using Org.Apache.REEF.Tang.Formats;
using Org.Apache.REEF.Tang.Interface;
using Org.Apache.REEF.Tang.Util;
-using System;
-using System.Collections.Generic;
-using System.IO;
namespace Org.Apache.REEF.Client.AzureBatch
{
@@ -49,6 +49,11 @@
public static readonly OptionalParameter<IList<string>> AzureBatchPoolDriverPortsList = new OptionalParameter<IList<string>>();
+ public static readonly OptionalParameter<string> ContainerRegistryServer = new OptionalParameter<string>();
+ public static readonly OptionalParameter<string> ContainerRegistryUsername = new OptionalParameter<string>();
+ public static readonly OptionalParameter<string> ContainerRegistryPassword = new OptionalParameter<string>();
+ public static readonly OptionalParameter<string> ContainerImageName = new OptionalParameter<string>();
+
public static ConfigurationModule ConfigurationModule = new AzureBatchRuntimeClientConfiguration()
.BindImplementation(GenericType<IREEFClient>.Class, GenericType<AzureBatchDotNetClient>.Class)
.BindNamedParameter(GenericType<AzureBatchAccountUri>.Class, AzureBatchAccountUri)
@@ -61,6 +66,10 @@
.BindNamedParameter(GenericType<DriverHTTPConnectionRetryInterval>.Class, DriverHTTPConnectionRetryInterval)
.BindNamedParameter(GenericType<DriverHTTPConnectionAttempts>.Class, DriverHTTPConnectionAttempts)
.BindNamedParameter(GenericType<AzureBatchPoolDriverPortsList>.Class, AzureBatchPoolDriverPortsList)
+ .BindNamedParameter(GenericType<ContainerRegistryServer>.Class, ContainerRegistryServer)
+ .BindNamedParameter(GenericType<ContainerRegistryUsername>.Class, ContainerRegistryUsername)
+ .BindNamedParameter(GenericType<ContainerRegistryPassword>.Class, ContainerRegistryPassword)
+ .BindNamedParameter(GenericType<ContainerImageName>.Class, ContainerImageName)
.Build();
public static IConfiguration FromTextFile(string file)
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerImageName.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerImageName.cs
new file mode 100644
index 0000000..d39590d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerImageName.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+ [NamedParameter(Documentation = "Docker Container Image Name", DefaultValue = "")]
+ public sealed class ContainerImageName : Name<string>
+ {
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryPassword.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryPassword.cs
new file mode 100644
index 0000000..e75aa9e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryPassword.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+ [NamedParameter(Documentation = "Container Registry Password for Docker Images", DefaultValue = "")]
+ public sealed class ContainerRegistryPassword : Name<string>
+ {
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryServer.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryServer.cs
new file mode 100644
index 0000000..ff9a002
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryServer.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+ [NamedParameter(Documentation = "Container Registry Server for Docker Images", DefaultValue = "")]
+ public sealed class ContainerRegistryServer : Name<string>
+ {
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryUsername.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryUsername.cs
new file mode 100644
index 0000000..6e4582d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryUsername.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+ [NamedParameter(Documentation = "Container Registry Username for Docker Images", DefaultValue = "")]
+ public sealed class ContainerRegistryUsername : Name<string>
+ {
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
index 075ce00..c6deb7b 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections.Generic;
+using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Batch;
using Microsoft.Azure.Batch.Common;
@@ -28,7 +29,7 @@
namespace Org.Apache.REEF.Client.DotNet.AzureBatch
{
- public sealed class AzureBatchService : IDisposable
+ internal sealed class AzureBatchService : IDisposable
{
private static readonly Logger LOGGER = Logger.GetLogger(typeof(AzureBatchService));
private static readonly TimeSpan RetryDeltaBackOff = TimeSpan.FromSeconds(5);
@@ -36,24 +37,36 @@
private const int MaxRetries = 3;
public BatchSharedKeyCredential Credentials { get; private set; }
- public string PoolId { get; private set; }
+ public string PoolId { get; }
- private BatchClient Client { get; set; }
+ private BatchClient Client { get; }
+ private ContainerRegistryProvider ContainerRegistryProvider { get; }
+ private IList<string> Ports { get; }
+ private ICommandBuilder CommandBuilder { get; }
+ private bool AreContainersEnabled => ContainerRegistryProvider.IsValid();
+
private bool disposed;
[Inject]
public AzureBatchService(
+ ContainerRegistryProvider containerRegistryProvider,
+ ICommandBuilder commandBuilder,
[Parameter(typeof(AzureBatchAccountUri))] string azureBatchAccountUri,
[Parameter(typeof(AzureBatchAccountName))] string azureBatchAccountName,
[Parameter(typeof(AzureBatchAccountKey))] string azureBatchAccountKey,
- [Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId)
+ [Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId,
+ [Parameter(typeof(AzureBatchPoolDriverPortsList))] IList<string> ports)
{
- BatchSharedKeyCredential credentials = new BatchSharedKeyCredential(azureBatchAccountUri, azureBatchAccountName, azureBatchAccountKey);
+ BatchSharedKeyCredential credentials =
+ new BatchSharedKeyCredential(azureBatchAccountUri, azureBatchAccountName, azureBatchAccountKey);
- this.Client = BatchClient.Open(credentials);
- this.Credentials = credentials;
- this.PoolId = azureBatchPoolId;
- this.Client.CustomBehaviors.Add(new RetryPolicyProvider(new ExponentialRetry(RetryDeltaBackOff, MaxRetries)));
+ Ports = ports;
+ Client = BatchClient.Open(credentials);
+ Credentials = credentials;
+ PoolId = azureBatchPoolId;
+ ContainerRegistryProvider = containerRegistryProvider;
+ Client.CustomBehaviors.Add(new RetryPolicyProvider(new ExponentialRetry(RetryDeltaBackOff, MaxRetries)));
+ CommandBuilder = commandBuilder;
}
/// <summary>
@@ -61,13 +74,13 @@
/// </summary>
public void Dispose()
{
- this.Dispose(true);
+ Dispose(true);
GC.SuppressFinalize(this);
}
~AzureBatchService()
{
- this.Dispose(false);
+ Dispose(false);
}
/// <summary>
@@ -75,26 +88,27 @@
/// </summary>
private void Dispose(bool disposing)
{
- if (this.disposed)
+ if (disposed)
{
return;
}
if (disposing)
{
- this.Client.Dispose();
+ Client.Dispose();
}
- this.disposed = true;
+ disposed = true;
}
#region Job related operations
public void CreateJob(string jobId, Uri resourceFile, string commandLine, string storageContainerSAS)
{
- CloudJob unboundJob = this.Client.JobOperations.CreateJob();
+ CloudJob unboundJob = Client.JobOperations.CreateJob();
unboundJob.Id = jobId;
- unboundJob.PoolInformation = new PoolInformation() { PoolId = this.PoolId };
+ unboundJob.PoolInformation = new PoolInformation() { PoolId = PoolId };
+ unboundJob.JobPreparationTask = CreateJobPreparationTask();
unboundJob.JobManagerTask = new JobManagerTask()
{
Id = jobId,
@@ -102,26 +116,70 @@
RunExclusive = false,
ResourceFiles = resourceFile != null
- ? new List<ResourceFile>() { new ResourceFile(resourceFile.AbsoluteUri, AzureBatchFileNames.GetTaskJarFileName()) }
+ ? new List<ResourceFile>()
+ {
+ new ResourceFile(resourceFile.AbsoluteUri, AzureBatchFileNames.GetTaskJarFileName())
+ }
: new List<ResourceFile>(),
- EnvironmentSettings = new List<EnvironmentSetting> { new EnvironmentSetting(AzureStorageContainerSasToken, storageContainerSAS) },
+ EnvironmentSettings = new List<EnvironmentSetting>
+ {
+ new EnvironmentSetting(AzureStorageContainerSasToken, storageContainerSAS)
+ },
// This setting will signal Batch to generate an access token and pass it
// to the Job Manager Task (aka the Driver) as an environment variable.
// For more info, see
// https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.batch.cloudtask.authenticationtokensettings
- AuthenticationTokenSettings = new AuthenticationTokenSettings() { Access = AccessScope.Job }
+ AuthenticationTokenSettings = new AuthenticationTokenSettings() { Access = AccessScope.Job },
+ ContainerSettings = CreateTaskContainerSettings(jobId),
};
+ if (AreContainersEnabled)
+ {
+ unboundJob.JobManagerTask.UserIdentity =
+ new UserIdentity(autoUserSpecification: new AutoUserSpecification(elevationLevel: ElevationLevel.Admin));
+ }
+
unboundJob.Commit();
LOGGER.Log(Level.Info, "Submitted job {0}, commandLine {1} ", jobId, commandLine);
}
+ private JobPreparationTask CreateJobPreparationTask()
+ {
+ if (!AreContainersEnabled)
+ {
+ return null;
+ }
+
+ return new JobPreparationTask()
+ {
+ Id = "CaptureHostIpAddress",
+ CommandLine = CommandBuilder.CaptureIpAddressCommandLine()
+ };
+ }
+
+ private TaskContainerSettings CreateTaskContainerSettings(string dockerContainerId)
+ {
+ if (!AreContainersEnabled)
+ {
+ return null;
+ }
+
+ string portMappings = Ports
+ .Aggregate(seed: string.Empty, func: (aggregator, port) => $"{aggregator} -p {port}:{port}");
+
+ return new TaskContainerSettings(
+ imageName: ContainerRegistryProvider.ContainerImageName,
+ containerRunOptions:
+ $"-d --rm --name {dockerContainerId} --env HOST_IP_ADDR_PATH={CommandBuilder.GetIpAddressFilePath()} {portMappings}",
+ registry: ContainerRegistryProvider.GetContainerRegistry());
+ }
+
public CloudJob GetJob(string jobId, DetailLevel detailLevel)
{
- using (Task<CloudJob> getJobTask = this.GetJobAsync(jobId, detailLevel))
+ using (Task<CloudJob> getJobTask = GetJobAsync(jobId, detailLevel))
{
getJobTask.Wait();
return getJobTask.Result;
@@ -130,20 +188,20 @@
public Task<CloudJob> GetJobAsync(string jobId, DetailLevel detailLevel)
{
- return this.Client.JobOperations.GetJobAsync(jobId, detailLevel);
+ return Client.JobOperations.GetJobAsync(jobId, detailLevel);
}
public CloudTask GetJobManagerTaskFromJobId(string jobId)
{
- string driverTaskId = this.Client.JobOperations.GetJob(jobId).JobManagerTask.Id;
- return this.Client.JobOperations.GetTask(jobId, driverTaskId);
+ string driverTaskId = Client.JobOperations.GetJob(jobId).JobManagerTask.Id;
+ return Client.JobOperations.GetTask(jobId, driverTaskId);
}
public ComputeNode GetComputeNodeFromNodeId(string nodeId)
{
- return this.Client.PoolOperations.GetComputeNode(this.PoolId, nodeId);
+ return Client.PoolOperations.GetComputeNode(PoolId, nodeId);
}
- #endregion
+ #endregion Job related operations
}
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/ContainerRegistryProvider.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/ContainerRegistryProvider.cs
new file mode 100644
index 0000000..6d8e3f5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/ContainerRegistryProvider.cs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Microsoft.Azure.Batch;
+using Org.Apache.REEF.Client.AzureBatch.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.DotNet.AzureBatch
+{
+ public sealed class ContainerRegistryProvider
+ {
+ public string ContainerRegistryServer { get; }
+ public string ContainerRegistryUsername { get; }
+ public string ContainerRegistryPassword { get; }
+ public string ContainerImageName { get; }
+
+ [Inject]
+ public ContainerRegistryProvider(
+ [Parameter(typeof(ContainerRegistryServer))] string containerRegistryServer,
+ [Parameter(typeof(ContainerRegistryUsername))] string containerRegistryUsername,
+ [Parameter(typeof(ContainerRegistryPassword))] string containerRegistryPassword,
+ [Parameter(typeof(ContainerImageName))] string containerImageName
+ )
+ {
+ ContainerRegistryServer = containerRegistryServer;
+ ContainerRegistryUsername = containerRegistryUsername;
+ ContainerRegistryPassword = containerRegistryPassword;
+ ContainerImageName = containerImageName;
+ }
+
+ public bool IsValid()
+ {
+ return !string.IsNullOrEmpty(ContainerRegistryServer);
+ }
+
+ public ContainerRegistry GetContainerRegistry()
+ {
+ if (!IsValid())
+ {
+ return null;
+ }
+
+ return new ContainerRegistry(
+ userName: ContainerRegistryUsername,
+ registryServer: ContainerRegistryServer,
+ password: ContainerRegistryPassword);
+ }
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs
index 4363bd7..83e6762 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,6 +63,10 @@
return string.Format(_osCommandFormat, _commandPrefix + sb.ToString());
}
+ public abstract string CaptureIpAddressCommandLine();
+
+ public abstract string GetIpAddressFilePath();
+
/// <summary>
/// Returns the driver classpath string which is compatible with the intricacies of the OS.
/// </summary>
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs
index 23e83d4..9a4f88c 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,5 +31,17 @@
/// <param name="driverMemory">The memory in megabytes used by driver.</param>
/// <returns>The command string.</returns>
string BuildDriverCommand(int driverMemory);
+
+ /// <summary>
+ /// Returns the path to a file where the ip address is persisted.
+ /// </summary>
+ /// <returns>Path to the file.</returns>
+ string GetIpAddressFilePath();
+
+ /// <summary>
+ /// Returns a command line that saves the ip address of the host to a file.
+ /// </summary>
+ /// <returns>The command line string.</returns>
+ string CaptureIpAddressCommandLine();
}
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
index 4b02691..f1cbe42 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+using System.Collections.Generic;
+using System.IO;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.Avro;
using Org.Apache.REEF.Client.Avro.AzureBatch;
@@ -23,9 +25,6 @@
using Org.Apache.REEF.Common.Avro;
using Org.Apache.REEF.Common.Files;
using Org.Apache.REEF.Tang.Annotations;
-using System;
-using System.Collections.Generic;
-using System.IO;
namespace Org.Apache.REEF.Client.AzureBatch.Util
{
@@ -37,7 +36,7 @@
private readonly REEFFileNames _fileNames;
[Inject]
- JobJarMaker(
+ private JobJarMaker(
IResourceArchiveFileGenerator resourceArchiveFileGenerator,
DriverFolderPreparationHelper driverFolderPreparationHelper,
REEFFileNames fileNames,
@@ -46,7 +45,11 @@
[Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId,
[Parameter(typeof(AzureStorageAccountName))] string azureStorageAccountName,
[Parameter(typeof(AzureStorageContainerName))] string azureStorageContainerName,
- [Parameter(typeof(AzureBatchPoolDriverPortsList))] List<string> azureBatchPoolDriverPortsList)
+ [Parameter(typeof(AzureBatchPoolDriverPortsList))] List<string> azureBatchPoolDriverPortsList,
+ [Parameter(typeof(ContainerRegistryServer))] string containerRegistryServer,
+ [Parameter(typeof(ContainerRegistryUsername))] string containerRegistryUsername,
+ [Parameter(typeof(ContainerRegistryPassword))] string containerRegistryPassword,
+ [Parameter(typeof(ContainerImageName))] string containerImageName)
{
_resourceArchiveFileGenerator = resourceArchiveFileGenerator;
_driverFolderPreparationHelper = driverFolderPreparationHelper;
@@ -59,6 +62,10 @@
AzureStorageAccountName = azureStorageAccountName,
AzureStorageContainerName = azureStorageContainerName,
AzureBatchPoolDriverPortsList = azureBatchPoolDriverPortsList,
+ ContainerRegistryServer = containerRegistryServer,
+ ContainerRegistryUsername = containerRegistryUsername,
+ ContainerRegistryPassword = containerRegistryPassword,
+ ContainerImageName = containerImageName,
};
}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs
index eed2163..9780035 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs
@@ -25,6 +25,7 @@
{
private static readonly string CommandPrefix =
"unzip " + AzureBatchFileNames.GetTaskJarFileName() + " -d 'reef/'" + ";";
+
private const string ClassPathSeparator = ":";
private const string OsCommandFormat = "/bin/sh c \"{0}\"";
@@ -40,5 +41,16 @@
{
throw new NotImplementedException();
}
+
+ public override string GetIpAddressFilePath()
+ {
+ return "$AZ_BATCH_JOB_PREP_WORKING_DIR/hostip.txt";
+ }
+
+ public override string CaptureIpAddressCommandLine()
+ {
+ string filePath = GetIpAddressFilePath();
+ return $"/bin/bash -c \"rm -f {filePath}; echo `hostname -i` > {filePath}\"";
+ }
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs
index b7d6d2f..1699d95 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs
@@ -5,9 +5,9 @@
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
-//
+//
// http://www.apache.org/licenses/LICENSE-2.0
-//
+//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,7 @@
"[System.IO.Compression.ZipFile]::ExtractToDirectory(\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\" +
AzureBatchFileNames.GetTaskJarFileName() + "\\\", " +
"\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\reef\\\");";
+
private const string ClassPathSeparator = ";";
private const string OsCommandFormat = "powershell.exe /c \"{0}\";";
@@ -48,5 +49,15 @@
return string.Format("'{0};'", string.Join(ClassPathSeparator, classpathList));
}
+
+ public override string GetIpAddressFilePath()
+ {
+ return "%AZ_BATCH_JOB_PREP_WORKING_DIR%\\hostip.txt";
+ }
+
+ public override string CaptureIpAddressCommandLine()
+ {
+ return $"powershell /c \"Set-Content -Path hostip.txt -Value ((Test-Connection -ComputerName $Env:ComputerName -Count 1).IPV4Address.IPAddressToString) -NoNewline -Force\"";
+ }
}
}
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 4cb1c27..a995a7f 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -16,7 +16,6 @@
// under the License.
using System;
-using System.Collections.Generic;
using System.Globalization;
using Org.Apache.REEF.Client.API;
using Org.Apache.REEF.Client.AzureBatch;
@@ -98,10 +97,13 @@
return LocalRuntimeClientConfiguration.ConfigurationModule
.Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, "2")
.Build();
+
case YARN:
return YARNClientConfiguration.ConfigurationModule.Build();
+
case YARNRest:
return YARNClientConfiguration.ConfigurationModuleYARNRest.Build();
+
case HDInsight:
// To run against HDInsight please replace placeholders below, with actual values for
// blob storage account name and key, container name (available at Azure portal) and HDInsight
@@ -117,6 +119,7 @@
.Set(AzureBlobFileSystemConfiguration.AccountName, blobStorageAccountName)
.Set(AzureBlobFileSystemConfiguration.AccountKey, blobStorageAccountKey)
.Build();
+
case AzureBatch:
return AzureBatchRuntimeClientConfiguration.ConfigurationModule
.Set(AzureBatchRuntimeClientConfiguration.AzureBatchAccountKey, @"##########################################")
@@ -128,9 +131,20 @@
.Set(AzureBatchRuntimeClientConfiguration.AzureStorageContainerName, @"###########")
//// Extend default retry interval in Azure Batch
.Set(AzureBatchRuntimeClientConfiguration.DriverHTTPConnectionRetryInterval, "20000")
- //// To allow Driver - Client communication, please specify the ports to use to set up driver http server.
- //// These ports must be defined in Azure Batch InBoundNATPool.
- .Set(AzureBatchRuntimeClientConfiguration.AzureBatchPoolDriverPortsList, new List<string>(new string[] { "123", "456" }))
+ //// Following list of ports is required to enable the following options:
+ //// 1. To enable communication between driver and client:
+ //// The ports will be used to set up driver http server endpoint.
+ //// In addition, these ports must also be defined in Azure Batch InBoundNATPool to enable communication.
+ //// 2. To enable communication between docker containers:
+ //// These ports will be mapped between the container and host to
+ //// allow the communication between the containers.
+ //// In addition, the Azure Batch pool must be created as a container pool to use this feature.
+ // .Set(AzureBatchRuntimeClientConfiguration.AzureBatchPoolDriverPortsList, new List<string> { "2000", "2001", "2002" })
+ //// Bind to Container Registry properties if present
+ // .Set(AzureBatchRuntimeClientConfiguration.ContainerRegistryServer, @"###############")
+ // .Set(AzureBatchRuntimeClientConfiguration.ContainerRegistryUsername, @"###############")
+ // .Set(AzureBatchRuntimeClientConfiguration.ContainerRegistryPassword, @"###############")
+ // .Set(AzureBatchRuntimeClientConfiguration.ContainerImageName, @"###############")
.Build();
default:
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index 92f215f..658e4d8 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -78,7 +78,11 @@
{ "name": "AzureBatchPoolId", "type": "string" },
{ "name": "AzureStorageAccountName", "type": "string" },
{ "name": "AzureStorageContainerName", "type": "string" },
- { "name": "AzureBatchPoolDriverPortsList", "type": {"type": "array", "items": "string"}}
+ { "name": "AzureBatchPoolDriverPortsList", "type": {"type": "array", "items": "string"}},
+ { "name": "ContainerRegistryServer", "type": "string" },
+ { "name": "ContainerRegistryUsername", "type": "string" },
+ { "name": "ContainerRegistryPassword", "type": "string" },
+ { "name": "ContainerImageName", "type": "string" }
]
}
]
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
index 69e39f3..35a2e86 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
@@ -26,11 +26,7 @@
import org.apache.reef.runtime.azbatch.AzureBatchClasspathProvider;
import org.apache.reef.runtime.azbatch.AzureBatchJVMPathProvider;
import org.apache.reef.runtime.azbatch.client.AzureBatchDriverConfigurationProviderImpl;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
+import org.apache.reef.runtime.azbatch.parameters.*;
import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
import org.apache.reef.runtime.azbatch.util.command.WindowsCommandBuilder;
import org.apache.reef.runtime.common.REEFEnvironment;
@@ -43,17 +39,13 @@
import org.apache.reef.tang.*;
import org.apache.reef.tang.exceptions.InjectionException;
import org.apache.reef.wake.remote.RemoteConfiguration;
-import org.apache.reef.wake.remote.ports.ListTcpPortProvider;
-import org.apache.reef.wake.remote.ports.TcpPortProvider;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
import org.apache.reef.wake.time.Clock;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -95,20 +87,24 @@
TANG.newInjector(generateConfiguration(jobSubmissionParameters))
.getInstance(AzureBatchBootstrapDriverConfigGenerator.class);
+ final LocalAddressProvider defaultLocalAddressProvider =
+ Tang.Factory.getTang().newInjector().getInstance(LocalAddressProvider.class);
+
final JavaConfigurationBuilder launcherConfigBuilder =
TANG.newConfigurationBuilder()
.bindNamedParameter(RemoteConfiguration.ManagerName.class, "AzureBatchBootstrapREEFLauncher")
.bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
.bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+ .bindNamedParameter(RemoteConfiguration.HostAddress.class, defaultLocalAddressProvider.getLocalAddress())
.bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class);
+
// Check if user has set up preferred ports to use.
// If set, we prefer will launch driver that binds those ports.
- final List<String> preferredPorts = asStringList(jobSubmissionParameters.getAzureBatchPoolDriverPortsList());
-
- if (preferredPorts.size() > 0) {
- launcherConfigBuilder.bindList(TcpPortList.class, preferredPorts)
- .bindImplementation(TcpPortProvider.class, ListTcpPortProvider.class);
+ if (jobSubmissionParameters.getAzureBatchPoolDriverPortsList().size() > 0) {
+ for (CharSequence port : jobSubmissionParameters.getAzureBatchPoolDriverPortsList()) {
+ launcherConfigBuilder.bindSetEntry(TcpPortSet.class, port.toString());
+ }
}
final Configuration launcherConfig = launcherConfigBuilder.build();
@@ -156,17 +152,17 @@
avroAzureBatchJobSubmissionParameters.getAzureStorageAccountName().toString())
.bindNamedParameter(AzureStorageContainerName.class,
avroAzureBatchJobSubmissionParameters.getAzureStorageContainerName().toString())
+ .bindNamedParameter(ContainerRegistryServer.class,
+ avroAzureBatchJobSubmissionParameters.getContainerRegistryServer().toString())
+ .bindNamedParameter(ContainerRegistryUsername.class,
+ avroAzureBatchJobSubmissionParameters.getContainerRegistryUsername().toString())
+ .bindNamedParameter(ContainerRegistryPassword.class,
+ avroAzureBatchJobSubmissionParameters.getContainerRegistryPassword().toString())
+ .bindNamedParameter(ContainerImageName.class,
+ avroAzureBatchJobSubmissionParameters.getContainerImageName().toString())
.build();
}
- private static List<String> asStringList(final Collection<? extends CharSequence> list) {
- final List<String> result = new ArrayList<>(list.size());
- for (final CharSequence sequence : list) {
- result.add(sequence.toString());
- }
- return result;
- }
-
private static RuntimeException fatal(final String msg, final Throwable t) {
LOG.log(Level.SEVERE, msg, t);
return new RuntimeException(msg, t);
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
index d52644c..68b75f1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -33,6 +33,7 @@
import org.apache.reef.wake.remote.RemoteConfiguration;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
import org.apache.reef.webserver.ReefEventStateManager;
@@ -48,6 +49,7 @@
*/
public final class NameServerImpl implements NameServer {
+ private static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
private final Transport transport;
@@ -64,8 +66,10 @@
*/
@Inject
private NameServerImpl(
+ @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
@Parameter(NameServerParameters.NameServerPort.class) final int port,
@Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory,
+ final TcpPortProvider portProvider,
final LocalAddressProvider localAddressProvider) {
final Injector injector = Tang.Factory.getTang().newInjector();
@@ -75,8 +79,10 @@
final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
final EventHandler<NamingMessage> handler = createEventHandler(codec);
- injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress());
+ String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress;
+ injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, host);
injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
+ injector.bindVolatileInstance(TcpPortProvider.class, portProvider);
injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class,
new SyncStage<>(new NamingServerHandler(handler, codec)));
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
index 0887e9d..adab2cb 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
@@ -21,20 +21,25 @@
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.runtime.azbatch.driver.AzureBatchDriverConfiguration;
import org.apache.reef.runtime.azbatch.driver.RuntimeIdentifier;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.parameters.*;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.batch.ContainerRegistryProvider;
import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.remote.address.ContainerBasedLocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.SetTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
import javax.inject.Inject;
import java.net.URI;
+import java.util.Set;
/**
* Configuration provider for the Azure Batch runtime.
@@ -48,7 +53,9 @@
private final String azureBatchPoolId;
private final String azureStorageAccountName;
private final String azureStorageContainerName;
+ private final ContainerRegistryProvider containerRegistryProvider;
private final CommandBuilder commandBuilder;
+ private final Set<String> tcpPortSet;
@Inject
private AzureBatchDriverConfigurationProviderImpl(
@@ -58,6 +65,8 @@
@Parameter(AzureBatchPoolId.class) final String azureBatchPoolId,
@Parameter(AzureStorageAccountName.class) final String azureStorageAccountName,
@Parameter(AzureStorageContainerName.class) final String azureStorageContainerName,
+ @Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet,
+ final ContainerRegistryProvider containerRegistryProvider,
final CommandBuilder commandBuilder) {
this.jvmSlack = jvmSlack;
this.azureBatchAccountUri = azureBatchAccountUri;
@@ -65,7 +74,11 @@
this.azureBatchPoolId = azureBatchPoolId;
this.azureStorageAccountName = azureStorageAccountName;
this.azureStorageContainerName = azureStorageContainerName;
+ this.containerRegistryProvider = containerRegistryProvider;
this.commandBuilder = commandBuilder;
+
+ // Binding a parameter to a set is only allowed for strings, so we cast to strings.
+ this.tcpPortSet = AzureBatchHelper.toStringSet(tcpPortSet);
}
/**
@@ -82,19 +95,36 @@
final String clientRemoteId,
final String jobId,
final Configuration applicationConfiguration) {
- return Configurations.merge(
- AzureBatchDriverConfiguration.CONF.getBuilder()
- .bindImplementation(CommandBuilder.class, this.commandBuilder.getClass()).build()
- .set(AzureBatchDriverConfiguration.JOB_IDENTIFIER, jobId)
- .set(AzureBatchDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
- .set(AzureBatchDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
- .set(AzureBatchDriverConfiguration.RUNTIME_NAME, RuntimeIdentifier.RUNTIME_NAME)
- .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_URI, this.azureBatchAccountUri)
- .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_NAME, this.azureBatchAccountName)
- .set(AzureBatchDriverConfiguration.AZURE_BATCH_POOL_ID, this.azureBatchPoolId)
- .set(AzureBatchDriverConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
- .set(AzureBatchDriverConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
- .build(),
- applicationConfiguration);
+ ConfigurationModuleBuilder driverConfigurationBuilder = AzureBatchDriverConfiguration.CONF.getBuilder()
+ .bindImplementation(CommandBuilder.class, this.commandBuilder.getClass());
+
+ // If using docker containers, then use a different set of bindings
+ if (this.containerRegistryProvider.isValid()) {
+ driverConfigurationBuilder = driverConfigurationBuilder
+ .bindImplementation(LocalAddressProvider.class, ContainerBasedLocalAddressProvider.class)
+ .bindImplementation(TcpPortProvider.class, SetTcpPortProvider.class);
+ }
+
+ final Configuration driverConfiguration = driverConfigurationBuilder.build()
+ .set(AzureBatchDriverConfiguration.JOB_IDENTIFIER, jobId)
+ .set(AzureBatchDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
+ .set(AzureBatchDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+ .set(AzureBatchDriverConfiguration.RUNTIME_NAME, RuntimeIdentifier.RUNTIME_NAME)
+ .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_URI, this.azureBatchAccountUri)
+ .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_NAME, this.azureBatchAccountName)
+ .set(AzureBatchDriverConfiguration.AZURE_BATCH_POOL_ID, this.azureBatchPoolId)
+ .set(AzureBatchDriverConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
+ .set(AzureBatchDriverConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
+ .set(AzureBatchDriverConfiguration.CONTAINER_REGISTRY_SERVER,
+ this.containerRegistryProvider.getContainerRegistryServer())
+ .set(AzureBatchDriverConfiguration.CONTAINER_REGISTRY_USERNAME,
+ this.containerRegistryProvider.getContainerRegistryUsername())
+ .set(AzureBatchDriverConfiguration.CONTAINER_REGISTRY_PASSWORD,
+ this.containerRegistryProvider.getContainerRegistryPassword())
+ .set(AzureBatchDriverConfiguration.CONTAINER_IMAGE_NAME,
+ this.containerRegistryProvider.getContainerImageName())
+ .setMultiple(AzureBatchDriverConfiguration.TCP_PORT_SET, this.tcpPortSet)
+ .build();
+ return Configurations.merge(driverConfiguration, applicationConfiguration);
}
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
index cbb8a8c..95e2d24 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
@@ -93,7 +93,7 @@
*/
@Override
public void close() throws Exception {
- LOG.log(Level.INFO, "Closing " + AzureBatchJobSubmissionHandler.class.getName());
+ LOG.log(Level.INFO, "Closing {0}", AzureBatchJobSubmissionHandler.class.getName());
}
/**
@@ -147,7 +147,7 @@
private String createApplicationId(final JobSubmissionEvent jobSubmissionEvent) {
String uuid = UUID.randomUUID().toString();
- String jobIdentifier = jobSubmissionEvent.getIdentifier();
+ String jobIdentifier = jobSubmissionEvent.getIdentifier();
String jobNameShort = jobIdentifier.length() + 1 + uuid.length() < MAX_CHARS_JOB_NAME ?
jobIdentifier : jobIdentifier.substring(0, MAX_CHARS_JOB_NAME - uuid.length() - 1);
return jobNameShort + "-" + uuid;
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
index 541f8c4..7489211 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
@@ -23,6 +23,7 @@
import org.apache.reef.tang.formats.AvroConfigurationSerializer;
import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
import org.apache.reef.tang.formats.RequiredParameter;
import java.io.File;
@@ -75,6 +76,31 @@
public static final RequiredParameter<String> AZURE_STORAGE_CONTAINER_NAME = new RequiredParameter<>();
/**
+ * Container Registry server.
+ */
+ public static final OptionalParameter<String> CONTAINER_REGISTRY_SERVER = new OptionalParameter<>();
+
+ /**
+ * Container Registry username.
+ */
+ public static final OptionalParameter<String> CONTAINER_REGISTRY_USERNAME = new OptionalParameter<>();
+
+ /**
+ * Container Registry password.
+ */
+ public static final OptionalParameter<String> CONTAINER_REGISTRY_PASSWORD = new OptionalParameter<>();
+
+ /**
+ * Container Image name.
+ */
+ public static final OptionalParameter<String> CONTAINER_IMAGE_NAME = new OptionalParameter<>();
+
+ /**
+ * Set of tcp ports.
+ */
+ public static final OptionalParameter<Integer> TCP_PORT_SET = new OptionalParameter<>();
+
+ /**
* Create a {@link Configuration} object from an Avro configuration file.
*
* @param file the configuration file.
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
index 0d8860b..aa6e6d3 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
@@ -19,18 +19,13 @@
package org.apache.reef.runtime.azbatch.client;
import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountKey;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.parameters.*;
import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
import org.apache.reef.runtime.azbatch.util.command.LinuxCommandBuilder;
import org.apache.reef.runtime.azbatch.util.command.WindowsCommandBuilder;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountKey;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
/**
* Class that builds the ConfigurationModule for Azure Batch runtime.
@@ -49,27 +44,35 @@
* @param isWindows true if Azure Batch pool nodes run Windows, false otherwise.
* @return the configuration module object.
*/
- public static ConfigurationModule getOrCreateAzureBatchRuntimeConfiguration(final boolean isWindows) {
+ public static ConfigurationModule getOrCreateAzureBatchRuntimeConfiguration(
+ final boolean isWindows) {
if (AzureBatchRuntimeConfigurationCreator.conf == null) {
ConfigurationModuleBuilder builder = AzureBatchRuntimeConfigurationStatic.CONF;
- ConfigurationModule module;
+
if (isWindows) {
- module = builder.bindImplementation(CommandBuilder.class, WindowsCommandBuilder.class).build();
+ builder = builder.bindImplementation(CommandBuilder.class, WindowsCommandBuilder.class);
} else {
- module = builder.bindImplementation(CommandBuilder.class, LinuxCommandBuilder.class).build();
+ builder = builder.bindImplementation(CommandBuilder.class, LinuxCommandBuilder.class);
}
AzureBatchRuntimeConfigurationCreator.conf = new AzureBatchRuntimeConfiguration()
- .merge(module)
+ .merge(builder.build())
.bindNamedParameter(AzureBatchAccountName.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_NAME)
.bindNamedParameter(AzureBatchAccountUri.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_URI)
.bindNamedParameter(AzureBatchAccountKey.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_KEY)
.bindNamedParameter(AzureBatchPoolId.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_POOL_ID)
.bindNamedParameter(AzureStorageAccountName.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_NAME)
.bindNamedParameter(AzureStorageAccountKey.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_KEY)
+ .bindNamedParameter(ContainerRegistryServer.class, AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_SERVER)
+ .bindNamedParameter(
+ ContainerRegistryUsername.class, AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_USERNAME)
+ .bindNamedParameter(
+ ContainerRegistryPassword.class, AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_PASSWORD)
+ .bindNamedParameter(ContainerImageName.class, AzureBatchRuntimeConfiguration.CONTAINER_IMAGE_NAME)
.bindNamedParameter(
AzureStorageContainerName.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_CONTAINER_NAME)
+ .bindSetEntry(TcpPortSet.class, AzureBatchRuntimeConfiguration.TCP_PORT_SET)
.build();
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
index c78e9e3..ce62be4 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
@@ -20,17 +20,20 @@
import org.apache.reef.annotations.audience.Public;
import org.apache.reef.runtime.azbatch.parameters.*;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.batch.ContainerRegistryProvider;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
import javax.inject.Inject;
+import java.util.Set;
/**
* Class that provides the runtime configuration for Azure Batch.
*/
@Public
public final class AzureBatchRuntimeConfigurationProvider {
-
private final String azureBatchAccountName;
private final String azureBatchAccountKey;
private final String azureBatchAccountUri;
@@ -38,7 +41,9 @@
private final String azureStorageAccountName;
private final String azureStorageAccountKey;
private final String azureStorageContainerName;
+ private final ContainerRegistryProvider containerRegistryProvider;
private final Boolean isWindows;
+ private final Set<String> tcpPortSet;
/**
* Private constructor.
@@ -52,7 +57,9 @@
@Parameter(AzureStorageAccountName.class) final String azureStorageAccountName,
@Parameter(AzureStorageAccountKey.class) final String azureStorageAccountKey,
@Parameter(AzureStorageContainerName.class) final String azureStorageContainerName,
- @Parameter(IsWindows.class) final Boolean isWindows) {
+ @Parameter(IsWindows.class) final Boolean isWindows,
+ @Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet,
+ final ContainerRegistryProvider containerRegistryProvider) {
this.azureBatchAccountName = azureBatchAccountName;
this.azureBatchAccountKey = azureBatchAccountKey;
this.azureBatchAccountUri = azureBatchAccountUri;
@@ -60,7 +67,11 @@
this.azureStorageAccountName = azureStorageAccountName;
this.azureStorageAccountKey = azureStorageAccountKey;
this.azureStorageContainerName = azureStorageContainerName;
+ this.containerRegistryProvider = containerRegistryProvider;
this.isWindows = isWindows;
+
+ // Binding a parameter to a set is only allowed for strings, so we cast to strings.
+ this.tcpPortSet = AzureBatchHelper.toStringSet(tcpPortSet);
}
public Configuration getAzureBatchRuntimeConfiguration() {
@@ -73,6 +84,15 @@
.set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
.set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_KEY, this.azureStorageAccountKey)
.set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
+ .set(AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_SERVER,
+ this.containerRegistryProvider.getContainerRegistryServer())
+ .set(AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_USERNAME,
+ this.containerRegistryProvider.getContainerRegistryUsername())
+ .set(AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_PASSWORD,
+ this.containerRegistryProvider.getContainerRegistryPassword())
+ .set(AzureBatchRuntimeConfiguration.CONTAINER_IMAGE_NAME,
+ this.containerRegistryProvider.getContainerImageName())
+ .setMultiple(AzureBatchRuntimeConfiguration.TCP_PORT_SET, this.tcpPortSet)
.build();
}
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
index 9a096be..b8b8f18 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
@@ -41,6 +41,7 @@
import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
import org.apache.reef.tang.formats.OptionalParameter;
import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
/**
* ConfigurationModule to create Azure Batch Driver configurations.
@@ -95,6 +96,31 @@
public static final RequiredParameter<String> AZURE_STORAGE_CONTAINER_NAME = new RequiredParameter<>();
/**
+ * Container Registry Server.
+ */
+ public static final OptionalParameter<String> CONTAINER_REGISTRY_SERVER = new OptionalParameter<>();
+
+ /**
+ * Container Registry Username.
+ */
+ public static final OptionalParameter<String> CONTAINER_REGISTRY_USERNAME = new OptionalParameter<>();
+
+ /**
+ * Container Registry password.
+ */
+ public static final OptionalParameter<String> CONTAINER_REGISTRY_PASSWORD = new OptionalParameter<>();
+
+ /**
+ * Container Image name.
+ */
+ public static final OptionalParameter<String> CONTAINER_IMAGE_NAME = new OptionalParameter<>();
+
+ /**
+ * Set of TCP Ports.
+ */
+ public static final OptionalParameter<Integer> TCP_PORT_SET = new OptionalParameter<>();
+
+ /**
* The fraction of the container memory NOT to use for the Java Heap.
*/
public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
@@ -115,6 +141,12 @@
.bindNamedParameter(AzureStorageAccountName.class, AZURE_STORAGE_ACCOUNT_NAME)
.bindNamedParameter(AzureStorageContainerName.class, AZURE_STORAGE_CONTAINER_NAME)
+ // Bind Azure Container Parameters
+ .bindNamedParameter(ContainerRegistryServer.class, CONTAINER_REGISTRY_SERVER)
+ .bindNamedParameter(ContainerRegistryUsername.class, CONTAINER_REGISTRY_USERNAME)
+ .bindNamedParameter(ContainerRegistryPassword.class, CONTAINER_REGISTRY_PASSWORD)
+ .bindNamedParameter(ContainerImageName.class, CONTAINER_IMAGE_NAME)
+
// Bind the fields bound in AbstractDriverRuntimeConfiguration
.bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)
.bindNamedParameter(LaunchID.class, JOB_IDENTIFIER)
@@ -125,5 +157,6 @@
.bindImplementation(RuntimeClasspathProvider.class, AzureBatchClasspathProvider.class)
.bindImplementation(RuntimePathProvider.class, AzureBatchJVMPathProvider.class)
.bindSetEntry(DefinedRuntimes.class, RUNTIME_NAME)
+ .bindSetEntry(TcpPortSet.class, TCP_PORT_SET)
.build();
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
index 85b9103..e3d8b1b 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
@@ -20,10 +20,15 @@
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.runtime.azbatch.evaluator.EvaluatorShimConfiguration;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.batch.ContainerRegistryProvider;
import org.apache.reef.runtime.common.utils.RemoteManager;
import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
import javax.inject.Inject;
+import java.util.Set;
/**
* Configuration provider for the Azure Batch evaluator shim.
@@ -32,10 +37,18 @@
public class AzureBatchEvaluatorShimConfigurationProvider {
private final RemoteManager remoteManager;
+ private final Set<String> tcpPortSet;
+ private final ContainerRegistryProvider containerRegistryProvider;
@Inject
- AzureBatchEvaluatorShimConfigurationProvider(final RemoteManager remoteManager) {
+ AzureBatchEvaluatorShimConfigurationProvider(
+ @Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet,
+ final ContainerRegistryProvider containerRegistryProvider,
+ final RemoteManager remoteManager) {
this.remoteManager = remoteManager;
+ this.containerRegistryProvider = containerRegistryProvider;
+ // Binding a parameter to a set is only allowed for strings, so we cast to strings.
+ this.tcpPortSet = AzureBatchHelper.toStringSet(tcpPortSet);
}
/**
@@ -46,9 +59,13 @@
* @return A {@link Configuration} object needed to launch the evaluator shim.
*/
public Configuration getConfiguration(final String containerId) {
- return EvaluatorShimConfiguration.CONF
+
+ return EvaluatorShimConfiguration
+ .getConfigurationModule(this.containerRegistryProvider.isValid())
.set(EvaluatorShimConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteManager.getMyIdentifier())
.set(EvaluatorShimConfiguration.CONTAINER_IDENTIFIER, containerId)
+ .setMultiple(EvaluatorShimConfiguration.TCP_PORT_SET, this.tcpPortSet)
.build();
}
}
+
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
index c114ba2..be6428e 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
@@ -25,8 +25,14 @@
import org.apache.reef.runtime.common.launch.REEFMessageCodec;
import org.apache.reef.tang.formats.ConfigurationModule;
import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
import org.apache.reef.tang.formats.RequiredParameter;
import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.remote.address.ContainerBasedLocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.SetTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
/**
* ConfigurationModule to create evaluator shim configurations.
@@ -45,9 +51,28 @@
*/
public static final RequiredParameter<String> CONTAINER_IDENTIFIER = new RequiredParameter<>();
- public static final ConfigurationModule CONF = new EvaluatorShimConfiguration()
+ /**
+ * Set of TCP Ports.
+ */
+ public static final OptionalParameter<Integer> TCP_PORT_SET = new OptionalParameter<>();
+
+ private static final ConfigurationModule CONF = new EvaluatorShimConfiguration()
.bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
.bindNamedParameter(DriverRemoteIdentifier.class, DRIVER_REMOTE_IDENTIFIER)
.bindNamedParameter(ContainerIdentifier.class, CONTAINER_IDENTIFIER)
+ .bindSetEntry(TcpPortSet.class, TCP_PORT_SET)
.build();
+
+ public static ConfigurationModule getConfigurationModule(final boolean includeContainerConfiguration) {
+ ConfigurationModuleBuilder shimConfigurationBuilder = EvaluatorShimConfiguration.CONF.getBuilder();
+
+ // If using docker containers, then use a different set of bindings
+ if (includeContainerConfiguration) {
+ shimConfigurationBuilder = shimConfigurationBuilder
+ .bindImplementation(LocalAddressProvider.class, ContainerBasedLocalAddressProvider.class)
+ .bindImplementation(TcpPortProvider.class, SetTcpPortProvider.class);
+ }
+
+ return shimConfigurationBuilder.build();
+ }
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerImageName.java
similarity index 71%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerImageName.java
index c7f4fbf..3d9ad07 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerImageName.java
@@ -16,22 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
-import java.util.List;
-
/**
- * An list of tcp port numbers to try.
+ * The container registry image name.
*/
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
- /**
- * Empty private constructor to prohibit instantiation of utility class.
- */
- private TcpPortList() {
- }
+@NamedParameter(doc = "The container image name.", default_value = "")
+public final class ContainerImageName implements Name<String> {
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryPassword.java
similarity index 70%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryPassword.java
index c7f4fbf..acff823 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryPassword.java
@@ -16,22 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
-import java.util.List;
+// TODO[JIRA REEF-TBD]: Secure the password by encrypting it and using
+// Azure Batch certificates to decrypt on the driver side.
/**
- * An list of tcp port numbers to try.
+ * The container registry password.
*/
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
- /**
- * Empty private constructor to prohibit instantiation of utility class.
- */
- private TcpPortList() {
- }
+@NamedParameter(doc = "The container registry password.", default_value = "")
+public final class ContainerRegistryPassword implements Name<String> {
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryServer.java
similarity index 71%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryServer.java
index c7f4fbf..a001869 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryServer.java
@@ -16,22 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
-import java.util.List;
-
/**
- * An list of tcp port numbers to try.
+ * The container registry server name.
*/
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
- /**
- * Empty private constructor to prohibit instantiation of utility class.
- */
- private TcpPortList() {
- }
+@NamedParameter(doc = "The container registry server name.", default_value = "")
+public final class ContainerRegistryServer implements Name<String> {
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryUsername.java
similarity index 71%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryUsername.java
index c7f4fbf..fe6b82e 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryUsername.java
@@ -16,22 +16,14 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
-import java.util.List;
-
/**
- * An list of tcp port numbers to try.
+ * The container registry user name.
*/
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
- /**
- * Empty private constructor to prohibit instantiation of utility class.
- */
- private TcpPortList() {
- }
+@NamedParameter(doc = "The container registry user name.", default_value = "")
+public final class ContainerRegistryUsername implements Name<String> {
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
index e0753df..f6e59ef 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
@@ -21,18 +21,18 @@
import com.microsoft.azure.batch.BatchClient;
import com.microsoft.azure.batch.protocol.models.*;
-import org.apache.reef.runtime.azbatch.client.AzureBatchJobSubmissionHandler;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.parameters.*;
import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
import org.apache.reef.runtime.azbatch.util.storage.SharedAccessSignatureCloudBlobClientProvider;
import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.address.ContainerBasedLocalAddressProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
import javax.inject.Inject;
import java.io.IOException;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -41,27 +41,39 @@
*/
public final class AzureBatchHelper {
- private static final Logger LOG = Logger.getLogger(AzureBatchJobSubmissionHandler.class.getName());
+ private static final Logger LOG = Logger.getLogger(AzureBatchHelper.class.getName());
/*
* Environment variable that contains the Azure Batch jobId.
*/
private static final String AZ_BATCH_JOB_ID_ENV = "AZ_BATCH_JOB_ID";
+ private static final String CAPTURE_HOST_IP_ADDRESS_TASK_NAME = "CaptureHostIpAddress";
private final AzureBatchFileNames azureBatchFileNames;
private final BatchClient client;
private final PoolInformation poolInfo;
+ private final CommandBuilder commandBuilder;
+ private final ContainerRegistryProvider containerRegistryProvider;
+ private final boolean areContainersEnabled;
+ private final Set<Integer> ports;
@Inject
public AzureBatchHelper(
final AzureBatchFileNames azureBatchFileNames,
final IAzureBatchCredentialProvider credentialProvider,
+ final CommandBuilder commandBuilder,
+ final ContainerRegistryProvider containerRegistryProvider,
+ @Parameter(TcpPortSet.class) final Set<Integer> ports,
@Parameter(AzureBatchPoolId.class) final String azureBatchPoolId) {
this.azureBatchFileNames = azureBatchFileNames;
this.client = BatchClient.open(credentialProvider.getCredentials());
this.poolInfo = new PoolInformation().withPoolId(azureBatchPoolId);
+ this.commandBuilder = commandBuilder;
+ this.containerRegistryProvider = containerRegistryProvider;
+ this.ports = ports;
+ this.areContainersEnabled = this.containerRegistryProvider.isValid();
}
/**
@@ -75,7 +87,7 @@
*/
public void submitJob(final String applicationId, final String storageContainerSAS, final URI jobJarUri,
final String command) throws IOException {
- ResourceFile jarResourceFile = new ResourceFile()
+ final ResourceFile jarResourceFile = new ResourceFile()
.withBlobSource(jobJarUri.toString())
.withFilePath(AzureBatchFileNames.getTaskJarFileName());
@@ -83,26 +95,29 @@
// as an environment variable.
// See https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.batch.cloudtask.authenticationtokensettings
// for more info.
- AuthenticationTokenSettings authenticationTokenSettings = new AuthenticationTokenSettings();
- authenticationTokenSettings.withAccess(Collections.singletonList(AccessScope.JOB));
+ final AuthenticationTokenSettings authenticationTokenSettings = new AuthenticationTokenSettings()
+ .withAccess(Collections.singletonList(AccessScope.JOB));
- EnvironmentSetting environmentSetting = new EnvironmentSetting()
+ final EnvironmentSetting environmentSetting = new EnvironmentSetting()
.withName(SharedAccessSignatureCloudBlobClientProvider.AZURE_STORAGE_CONTAINER_SAS_TOKEN_ENV)
.withValue(storageContainerSAS);
- JobManagerTask jobManagerTask = new JobManagerTask()
+ final JobManagerTask jobManagerTask = new JobManagerTask()
.withRunExclusive(false)
.withId(applicationId)
.withResourceFiles(Collections.singletonList(jarResourceFile))
.withEnvironmentSettings(Collections.singletonList(environmentSetting))
.withAuthenticationTokenSettings(authenticationTokenSettings)
+ .withKillJobOnCompletion(true)
+ .withContainerSettings(createTaskContainerSettings(applicationId))
.withCommandLine(command);
- LOG.log(Level.INFO, "Job Manager (aka driver) task command: " + command);
+ LOG.log(Level.INFO, "Job Manager (aka driver) task command: {0}", command);
- JobAddParameter jobAddParameter = new JobAddParameter()
+ final JobAddParameter jobAddParameter = new JobAddParameter()
.withId(applicationId)
.withJobManagerTask(jobManagerTask)
+ .withJobPreparationTask(createJobPreparationTask())
.withPoolInfo(poolInfo);
client.jobOperations().createJob(jobAddParameter);
@@ -134,13 +149,19 @@
.withFilePath(this.azureBatchFileNames.getEvaluatorShimConfigurationPath());
resources.add(confSourceFile);
- LOG.log(Level.INFO, "Evaluator task command: " + command);
+ LOG.log(Level.INFO, "Evaluator task command: {0}", command);
- final TaskAddParameter taskAddParameter = new TaskAddParameter()
+ TaskAddParameter taskAddParameter = new TaskAddParameter()
.withId(taskId)
.withResourceFiles(resources)
+ .withContainerSettings(createTaskContainerSettings(taskId))
.withCommandLine(command);
+ if (this.areContainersEnabled) {
+ taskAddParameter = taskAddParameter.withUserIdentity(
+ new UserIdentity().withAutoUser(new AutoUserSpecification().withElevationLevel(ElevationLevel.ADMIN)));
+ }
+
this.client.taskOperations().createTask(jobId, taskAddParameter);
}
@@ -169,4 +190,44 @@
public String getAzureBatchJobId() {
return System.getenv(AZ_BATCH_JOB_ID_ENV);
}
+
+ private TaskContainerSettings createTaskContainerSettings(final String dockerContainerId) {
+ if (!this.areContainersEnabled) {
+ return null;
+ }
+
+ final StringBuilder runOptions = new StringBuilder(String.format(
+ "-d --rm --name %s --env %s=%s ",
+ dockerContainerId,
+ ContainerBasedLocalAddressProvider.HOST_IP_ADDR_PATH_ENV,
+ this.commandBuilder.getIpAddressFilePath()));
+
+ for (final int port : this.ports) {
+ runOptions.append(String.format("-p %d:%d ", port, port));
+ }
+
+ return new TaskContainerSettings()
+ .withRegistry(this.containerRegistryProvider.getContainerRegistry())
+ .withImageName(this.containerRegistryProvider.getContainerImageName())
+ .withContainerRunOptions(runOptions.toString());
+ }
+
+ private JobPreparationTask createJobPreparationTask() {
+ if (!this.areContainersEnabled) {
+ return null;
+ }
+
+ final String captureIpAddressCommandLine = this.commandBuilder.captureIpAddressCommandLine();
+ return new JobPreparationTask()
+ .withId(CAPTURE_HOST_IP_ADDRESS_TASK_NAME)
+ .withCommandLine(captureIpAddressCommandLine);
+ }
+
+ public static Set<String> toStringSet(final Collection<?> items) {
+ final Set<String> set = new HashSet<>(items.size());
+ for (final Object elem: items) {
+ set.add(elem.toString());
+ }
+ return Collections.unmodifiableSet(set);
+ }
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/ContainerRegistryProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/ContainerRegistryProvider.java
new file mode 100644
index 0000000..1bfafa9
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/ContainerRegistryProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.batch;
+
+import com.microsoft.azure.batch.protocol.models.ContainerRegistry;
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.runtime.azbatch.parameters.ContainerImageName;
+import org.apache.reef.runtime.azbatch.parameters.ContainerRegistryPassword;
+import org.apache.reef.runtime.azbatch.parameters.ContainerRegistryServer;
+import org.apache.reef.runtime.azbatch.parameters.ContainerRegistryUsername;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Class that holds the settings of the Azure container service settings.
+ */
+public final class ContainerRegistryProvider {
+
+ private final String containerRegistryServer;
+ private final String containerRegistryUsername;
+ private final String containerRegistryPassword;
+ private final String containerImageName;
+
+ @Inject
+ private ContainerRegistryProvider(
+ @Parameter(ContainerRegistryServer.class) final String containerRegistryServer,
+ @Parameter(ContainerRegistryUsername.class) final String containerRegistryUsername,
+ @Parameter(ContainerRegistryPassword.class) final String containerRegistryPassword,
+ @Parameter(ContainerImageName.class) final String containerImageName
+ ) {
+ this.containerRegistryServer = containerRegistryServer;
+ this.containerRegistryUsername = containerRegistryUsername;
+ this.containerRegistryPassword = containerRegistryPassword;
+ this.containerImageName = containerImageName;
+ }
+
+ public boolean isValid() {
+ return !StringUtils.isEmpty(this.containerRegistryServer);
+ }
+
+ public String getContainerRegistryServer() {
+ return this.containerRegistryServer;
+ }
+
+ public String getContainerRegistryUsername() {
+ return this.containerRegistryUsername;
+ }
+
+ public String getContainerRegistryPassword() {
+ return this.containerRegistryPassword;
+ }
+
+ public String getContainerImageName() {
+ return this.containerImageName;
+ }
+
+ public ContainerRegistry getContainerRegistry() {
+ if (!this.isValid()) {
+ throw new RuntimeException("Container registry is invalid");
+ }
+
+ return new ContainerRegistry()
+ .withRegistryServer(this.getContainerRegistryServer())
+ .withUserName(this.getContainerRegistryUsername())
+ .withPassword(this.getContainerRegistryPassword());
+ }
+}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
index 96cd716..b05e010 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
@@ -53,4 +53,18 @@
*/
String buildEvaluatorCommand(final ResourceLaunchEvent resourceLaunchEvent,
final int containerMemory, final double jvmHeapFactor);
+
+ /**
+ * Returns the path to a file where the ip address is persisted.
+ *
+ * @return path to the file.
+ */
+ String getIpAddressFilePath();
+
+ /**
+ * Returns a command line that saves the ip address of the host to a file.
+ *
+ * @return command line string.
+ */
+ String captureIpAddressCommandLine();
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
index adb58f5..0865662 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
@@ -65,4 +65,15 @@
protected String getEvaluatorShimClasspath() {
return StringUtils.join(super.classpathProvider.getEvaluatorClasspath(), CLASSPATH_SEPARATOR_CHAR);
}
+
+ @Override
+ public String getIpAddressFilePath() {
+ return "$AZ_BATCH_JOB_PREP_WORKING_DIR/hostip.txt";
+ }
+
+ @Override
+ public String captureIpAddressCommandLine() {
+ final String filePath = getIpAddressFilePath();
+ return String.format("/bin/bash -c \"rm -f %s; echo `hostname -i` > %s\"", filePath, filePath);
+ }
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
index f382bef..ea0e097 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
@@ -71,4 +71,16 @@
return String.format("'%s'", StringUtils.join(
super.classpathProvider.getEvaluatorClasspath(), CLASSPATH_SEPARATOR_CHAR));
}
+
+ @Override
+ public String getIpAddressFilePath() {
+ return "%AZ_BATCH_JOB_PREP_WORKING_DIR%\\hostip.txt";
+ }
+
+ @Override
+ public String captureIpAddressCommandLine() {
+ return String.format("powershell /c \"Set-Content -Path %s -Value "
+ + "((Test-Connection -ComputerName $Env:ComputerName -Count 1).IPV4Address.IPAddressToString) "
+ + " -NoNewline -Force\"", getIpAddressFilePath());
+ }
}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
index e678445..a087488 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
@@ -100,7 +100,7 @@
}
URI result = builder.addToURI(uri);
- LOG.log(Level.INFO, "Here's the URI: " + result);
+ LOG.log(Level.INFO, "Here's the URI: {0}", result);
return result;
} catch (StorageException | URISyntaxException e) {
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/ContainerBasedLocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/ContainerBasedLocalAddressProvider.java
new file mode 100644
index 0000000..2b63935
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/ContainerBasedLocalAddressProvider.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.address;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A LocalAddressProvider that reads the contents of the file at HOST_IP_ADDR_PATH and uses it to be the ip address.
+ */
+public final class ContainerBasedLocalAddressProvider implements LocalAddressProvider {
+
+ private static final Pattern IPADDRESS_PATTERN = Pattern.compile(
+ "(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)");
+
+ public static final String HOST_IP_ADDR_PATH_ENV = "HOST_IP_ADDR_PATH";
+ private static final Logger LOG = Logger.getLogger(ContainerBasedLocalAddressProvider.class.getName());
+
+ private String cachedLocalAddress = null;
+
+ /**
+ * The constructor is for Tang only.
+ */
+ @Inject
+ private ContainerBasedLocalAddressProvider() {
+ LOG.log(Level.FINE, "Instantiating ContainerBasedLocalAddressProvider");
+ }
+
+ @Override
+ public synchronized String getLocalAddress() {
+ if (cachedLocalAddress != null) {
+ return cachedLocalAddress;
+ }
+
+ final String ipAddressPath = System.getenv(HOST_IP_ADDR_PATH_ENV);
+ LOG.log(Level.FINE, "IpAddressPath is {0}", ipAddressPath);
+ if (StringUtils.isEmpty(ipAddressPath)) {
+ final String message = String.format("Environment variable must be set for %s", HOST_IP_ADDR_PATH_ENV);
+ LOG.log(Level.SEVERE, message);
+ throw new RuntimeException(message);
+ }
+
+ final File ipAddressFile = new File(ipAddressPath);
+ if (!ipAddressFile.exists() || !ipAddressFile.isFile()) {
+ final String message = String.format("%s points to invalid path: %s", HOST_IP_ADDR_PATH_ENV, ipAddressPath);
+ LOG.log(Level.SEVERE, message);
+ throw new RuntimeException(message);
+ }
+
+ try {
+ cachedLocalAddress = readFile(ipAddressPath, StandardCharsets.UTF_8);
+ return cachedLocalAddress;
+ } catch (IOException e) {
+ final String message = String.format("Exception when attempting to read file %s", ipAddressPath);
+ LOG.log(Level.SEVERE, message, e);
+ throw new RuntimeException(message, e);
+ }
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return Tang.Factory.getTang().newConfigurationBuilder()
+ .bind(LocalAddressProvider.class, ContainerBasedLocalAddressProvider.class)
+ .build();
+ }
+
+ @Override
+ public String toString() {
+ return "ContainerBasedLocalAddressProvider:" + this.getLocalAddress();
+ }
+
+ private String readFile(final String path, final Charset encoding)
+ throws IOException {
+ final byte[] encoded = Files.readAllBytes(Paths.get(path));
+ final String ipString = new String(encoded, encoding);
+ final Matcher matcher = IPADDRESS_PATTERN.matcher(StringUtils.trim(ipString));
+
+ if (!matcher.matches()) {
+ throw new RuntimeException(String.format("File at location %s has invalid ip address", path));
+ }
+
+ return matcher.group();
+ }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index 4566aca..2e4f3d3 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -27,7 +27,6 @@
import org.apache.reef.wake.remote.ports.TcpPortProvider;
import org.apache.reef.wake.remote.transport.Transport;
import org.apache.reef.wake.remote.transport.TransportFactory;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
import javax.inject.Inject;
import java.net.InetSocketAddress;
@@ -53,11 +52,6 @@
*/
private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms
- /**
- * Indicates a hostname that isn't set or known.
- */
- public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME;
-
private final AtomicBoolean closed = new AtomicBoolean(false);
private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
@@ -95,7 +89,10 @@
this.handlerContainer.setTransport(this.transport);
- this.myIdentifier = new SocketRemoteIdentifier((InetSocketAddress)this.transport.getLocalAddress());
+ InetSocketAddress address = new InetSocketAddress(
+ localAddressProvider.getLocalAddress(),
+ this.transport.getListeningPort());
+ this.myIdentifier = new SocketRemoteIdentifier(address);
this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/SetTcpPortProvider.java
similarity index 69%
rename from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
rename to lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/SetTcpPortProvider.java
index c28a638..a6108af 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/SetTcpPortProvider.java
@@ -18,28 +18,27 @@
*/
package org.apache.reef.wake.remote.ports;
-
import org.apache.commons.lang.StringUtils;
import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
import javax.inject.Inject;
import java.util.Iterator;
-import java.util.List;
+import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
- * A TcpPortProvider which gives out random ports in a range.
+ * A TcpPortProvider which gives out ports in a set.
*/
-public final class ListTcpPortProvider implements TcpPortProvider {
+public final class SetTcpPortProvider implements TcpPortProvider {
- private static final Logger LOG = Logger.getLogger(ListTcpPortProvider.class.getName());
- private final List<Integer> tcpPortList;
+ private static final Logger LOG = Logger.getLogger(SetTcpPortProvider.class.getName());
+ private final Set<Integer> tcpPortSet;
@Inject
- public ListTcpPortProvider(@Parameter(TcpPortList.class) final List<Integer> tcpPortList) {
- this.tcpPortList = tcpPortList;
+ public SetTcpPortProvider(@Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet) {
+ this.tcpPortSet = tcpPortSet;
LOG.log(Level.FINE, "Instantiating {0}", this);
}
@@ -50,11 +49,11 @@
*/
@Override
public Iterator<Integer> iterator() {
- return this.tcpPortList.iterator();
+ return this.tcpPortSet.iterator();
}
@Override
public String toString() {
- return "ListTcpPortProvider{" + StringUtils.join(this.tcpPortList, ',') + '}';
+ return "SetTcpPortProvider{" + StringUtils.join(this.tcpPortSet, ',') + '}';
}
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortSet.java
similarity index 83%
rename from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
rename to lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortSet.java
index c7f4fbf..47025e8 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortSet.java
@@ -20,18 +20,17 @@
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.tang.annotations.NamedParameter;
-
-import java.util.List;
+import java.util.Set;
/**
- * An list of tcp port numbers to try.
+ * A set of tcp port numbers to try.
*/
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
+@NamedParameter(doc = "Set of tcp port numbers")
+public final class TcpPortSet implements Name<Set<Integer>> {
/**
* Empty private constructor to prohibit instantiation of utility class.
*/
- private TcpPortList() {
+ private TcpPortSet() {
}
}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index ea221ec..b51905e 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -255,9 +255,9 @@
return link;
}
}
-
+
if (i == this.numberOfTries) {
- // Connection failure
+ // Connection failure
throw new ConnectException("Connection to " + remoteAddr + " refused");
}
@@ -323,7 +323,7 @@
}
}
}
-
+
return link;
}
diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java
index 67dce8b..a10a4eb 100644
--- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java
+++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java
@@ -21,16 +21,15 @@
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.util.logging.LoggingScope;
import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.remote.RemoteConfiguration;
import org.apache.reef.wake.remote.address.LocalAddressProvider;
import org.apache.reef.wake.remote.ports.TcpPortProvider;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.bio.SocketConnector;
import javax.inject.Inject;
import java.net.BindException;
-import java.util.Iterator;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -39,6 +38,11 @@
*/
public final class HttpServerImpl implements HttpServer {
/**
+ * Indicates a hostname that isn't set or known.
+ */
+ private static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
+
+ /**
* Standard Java logger.
*/
private static final Logger LOG = Logger.getLogger(HttpServerImpl.class.getName());
@@ -64,35 +68,36 @@
private final LoggingScopeFactory loggingScopeFactory;
/**
- * The address provider for the HTTPServer.
+ * The host address for the HTTPServer.
*/
- private final LocalAddressProvider addressProvider;
+ private final String hostAddress;
/**
* Constructor of HttpServer that wraps Jetty Server.
*
* @param jettyHandler
- * @param portNumber
* @throws Exception
*/
@Inject
- HttpServerImpl(final JettyHandler jettyHandler,
+ HttpServerImpl(@Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
+ final JettyHandler jettyHandler,
final LocalAddressProvider addressProvider,
- @Parameter(TcpPortRangeBegin.class)final int portNumber,
final TcpPortProvider tcpPortProvider,
final LoggingScopeFactory loggingScopeFactory) throws Exception {
- this.addressProvider = addressProvider;
this.loggingScopeFactory = loggingScopeFactory;
this.jettyHandler = jettyHandler;
- int availablePort = portNumber;
- Server srv = null;
+ this.hostAddress = UNKNOWN_HOST_NAME.equals(hostAddress) ? addressProvider.getLocalAddress() : hostAddress;
try (final LoggingScope ls = this.loggingScopeFactory.httpServer()) {
- final Iterator<Integer> ports = tcpPortProvider.iterator();
- while (ports.hasNext() && srv == null) {
- availablePort = ports.next();
- srv = tryPort(availablePort);
+ Server srv = null;
+ int availablePort = -1;
+ for (final int p : tcpPortProvider) {
+ srv = tryPort(p);
+ if (srv != null) {
+ availablePort = p;
+ break;
+ }
}
if (srv != null) {
@@ -109,7 +114,7 @@
private Server tryPort(final int portNumber) throws Exception {
Server srv = new Server();
final Connector connector = new SocketConnector();
- connector.setHost(addressProvider.getLocalAddress());
+ connector.setHost(this.hostAddress);
connector.setPort(portNumber);
srv.addConnector(connector);
try {
@@ -117,7 +122,8 @@
LOG.log(Level.INFO, "Jetty Server started with port: {0}", portNumber);
} catch (final BindException ex) {
srv = null;
- LOG.log(Level.FINEST, "Cannot use port: {0}. Will try another", portNumber);
+ LOG.log(Level.FINEST, "Cannot use host: {0},port: {1}. Will try another",
+ new Object[] {this.hostAddress, portNumber});
}
return srv;
}