[REEF-2041] Enable docker container support for REEF on Azure Batch (#1478)

* Enable Azure Batch containers
* Add container registry to parameters
* Address changes needed for REEF to run with docker containers as well as without the use of docker containers
* TcpPortList is List instead of comma separated string
* Removed isDockerContainer and IS_CONTAINER_BASED_POOL parameters as they are not needed
* Use TcpPortListString to represent list of ports
* Addressed all feedback except consolidating container based configuration into a separate class
* Consolidate docker registry settings into the ContainerRegistryProvider class
* Enable docker container support on Windows for Azure Batch

JIRA: [REEF-2041](https://issues.apache.org/jira/browse/REEF-2041)

Closes #1478 
diff --git a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
index d0c1e14..88f8365 100644
--- a/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
+++ b/lang/cs/Org.Apache.REEF.Client/Avro/AzureBatch/AvroAzureBatchJobSubmissionParameters.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,16 +22,16 @@
 namespace Org.Apache.REEF.Client.Avro.AzureBatch
 {
     /// <summary>
-    /// Used to serialize and deserialize Avro record 
+    /// Used to serialize and deserialize Avro record
     /// org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters.
-    /// This is a (mostly) auto-generated class. 
+    /// This is a (mostly) auto-generated class.
     /// For instructions on how to regenerate, please view the README.md in the same folder.
     /// </summary>
     [Private]
     [DataContract(Namespace = "org.apache.reef.reef.bridge.client.avro")]
     public sealed class AvroAzureBatchJobSubmissionParameters
     {
-        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""AzureBatchAccountName"",""type"":""string""},{""name"":""AzureBatchAccountUri"",""type"":""string""},{""name"":""AzureBatchPoolId"",""type"":""string""},{""name"":""AzureStorageAccountName"",""type"":""string""},{""name"":""AzureStorageContainerName"",""type"":""string""},{""name"":""AzureBatchPoolDriverPortsList"",""type"":{""type"": ""array"", ""items"": ""string""}}]}";
+        private const string JsonSchema = @"{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroAzureBatchJobSubmissionParameters"",""doc"":""Job submission parameters used by the Azure Batch runtime"",""fields"":[{""name"":""sharedJobSubmissionParameters"",""type"":{""type"":""record"",""name"":""org.apache.reef.reef.bridge.client.avro.AvroJobSubmissionParameters"",""doc"":""General cross-language job submission parameters shared by all runtimes"",""fields"":[{""name"":""jobId"",""type"":""string""},{""name"":""jobSubmissionFolder"",""type"":""string""}]}},{""name"":""AzureBatchAccountName"",""type"":""string""},{""name"":""AzureBatchAccountUri"",""type"":""string""},{""name"":""AzureBatchPoolId"",""type"":""string""},{""name"":""AzureStorageAccountName"",""type"":""string""},{""name"":""AzureStorageContainerName"",""type"":""string""},{""name"":""AzureBatchPoolDriverPortsList"",""type"":{""type"": ""array"", ""items"": ""string""}},{""name"":""ContainerRegistryServer"",""type"":""string""},{""name"":""ContainerRegistryUsername"",""type"":""string""},{""name"":""ContainerRegistryPassword"",""type"":""string""},{""name"":""ContainerImageName"",""type"":""string""}]}";
 
         /// <summary>
         /// Gets the schema.
@@ -87,6 +87,30 @@
         public IList<string> AzureBatchPoolDriverPortsList { get; set; }
 
         /// <summary>
+        /// Gets or sets the ContainerRegistryServer field.
+        /// </summary>
+        [DataMember]
+        public string ContainerRegistryServer { get; set; }
+
+        /// <summary>
+        /// Gets or sets the ContainerRegistryUsername field.
+        /// </summary>
+        [DataMember]
+        public string ContainerRegistryUsername { get; set; }
+
+        /// <summary>
+        /// Gets or sets the ContainerRegistryPassword field.
+        /// </summary>
+        [DataMember]
+        public string ContainerRegistryPassword { get; set; }
+
+        /// <summary>
+        /// Gets or sets the ContainerImageName field.
+        /// </summary>
+        [DataMember]
+        public string ContainerImageName { get; set; }
+
+        /// <summary>
         /// Initializes a new instance of the <see cref="AvroAzureBatchJobSubmissionParameters"/> class.
         /// </summary>
         public AvroAzureBatchJobSubmissionParameters()
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
index d2721a4..8bde80c 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchDotNetClient.cs
@@ -18,8 +18,9 @@
 using System;
 using System.Threading.Tasks;
 using Org.Apache.REEF.Client.API;
-using Org.Apache.REEF.Client.AzureBatch.Storage;
+using Org.Apache.REEF.Client.API.Parameters;
 using Org.Apache.REEF.Client.AzureBatch;
+using Org.Apache.REEF.Client.AzureBatch.Storage;
 using Org.Apache.REEF.Client.AzureBatch.Util;
 using Org.Apache.REEF.Client.Common;
 using Org.Apache.REEF.Client.YARN.RestClient.DataModel;
@@ -27,8 +28,6 @@
 using Org.Apache.REEF.Tang.Annotations;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Utilities.Logging;
-using Org.Apache.REEF.Client.AzureBatch.Parameters;
-using Org.Apache.REEF.Client.API.Parameters;
 
 namespace Org.Apache.REEF.Client.DotNet.AzureBatch
 {
@@ -62,7 +61,7 @@
             AzureBatchService batchService,
             JobJarMaker jobJarMaker,
             //// Those parameters are used in AzureBatchJobSubmissionResult, but could not be injected there.
-            //// It introduces circular injection issues, as all classes constructor inherited from JobSubmissionResult has reference to IREEFClient. 
+            //// It introduces circular injection issues, as all classes constructor inherited from JobSubmissionResult has reference to IREEFClient.
             //// TODO: [REEF-2020] Refactor IJobSubmissionResult Interface and JobSubmissionResult implementation
             [Parameter(typeof(DriverHTTPConnectionRetryInterval))]int retryInterval,
             [Parameter(typeof(DriverHTTPConnectionAttempts))] int numberOfRetries)
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
index 195e2ce..2c55b5b 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/AzureBatchRuntimeClientConfiguration.cs
@@ -15,6 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System;
+using System.Collections.Generic;
+using System.IO;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.API.Parameters;
 using Org.Apache.REEF.Client.AzureBatch.Parameters;
@@ -22,9 +25,6 @@
 using Org.Apache.REEF.Tang.Formats;
 using Org.Apache.REEF.Tang.Interface;
 using Org.Apache.REEF.Tang.Util;
-using System;
-using System.Collections.Generic;
-using System.IO;
 
 namespace Org.Apache.REEF.Client.AzureBatch
 {
@@ -49,6 +49,11 @@
 
         public static readonly OptionalParameter<IList<string>> AzureBatchPoolDriverPortsList = new OptionalParameter<IList<string>>();
 
+        public static readonly OptionalParameter<string> ContainerRegistryServer = new OptionalParameter<string>();
+        public static readonly OptionalParameter<string> ContainerRegistryUsername = new OptionalParameter<string>();
+        public static readonly OptionalParameter<string> ContainerRegistryPassword = new OptionalParameter<string>();
+        public static readonly OptionalParameter<string> ContainerImageName = new OptionalParameter<string>();
+
         public static ConfigurationModule ConfigurationModule = new AzureBatchRuntimeClientConfiguration()
             .BindImplementation(GenericType<IREEFClient>.Class, GenericType<AzureBatchDotNetClient>.Class)
             .BindNamedParameter(GenericType<AzureBatchAccountUri>.Class, AzureBatchAccountUri)
@@ -61,6 +66,10 @@
             .BindNamedParameter(GenericType<DriverHTTPConnectionRetryInterval>.Class, DriverHTTPConnectionRetryInterval)
             .BindNamedParameter(GenericType<DriverHTTPConnectionAttempts>.Class, DriverHTTPConnectionAttempts)
             .BindNamedParameter(GenericType<AzureBatchPoolDriverPortsList>.Class, AzureBatchPoolDriverPortsList)
+            .BindNamedParameter(GenericType<ContainerRegistryServer>.Class, ContainerRegistryServer)
+            .BindNamedParameter(GenericType<ContainerRegistryUsername>.Class, ContainerRegistryUsername)
+            .BindNamedParameter(GenericType<ContainerRegistryPassword>.Class, ContainerRegistryPassword)
+            .BindNamedParameter(GenericType<ContainerImageName>.Class, ContainerImageName)
             .Build();
 
         public static IConfiguration FromTextFile(string file)
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerImageName.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerImageName.cs
new file mode 100644
index 0000000..d39590d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerImageName.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+    [NamedParameter(Documentation = "Docker Container Image Name", DefaultValue = "")]
+    public sealed class ContainerImageName : Name<string>
+    {
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryPassword.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryPassword.cs
new file mode 100644
index 0000000..e75aa9e
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryPassword.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+    [NamedParameter(Documentation = "Container Registry Password for Docker Images", DefaultValue = "")]
+    public sealed class ContainerRegistryPassword : Name<string>
+    {
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryServer.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryServer.cs
new file mode 100644
index 0000000..ff9a002
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryServer.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+    [NamedParameter(Documentation = "Container Registry Server for Docker Images", DefaultValue = "")]
+    public sealed class ContainerRegistryServer : Name<string>
+    {
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryUsername.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryUsername.cs
new file mode 100644
index 0000000..6e4582d
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Parameters/ContainerRegistryUsername.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.AzureBatch.Parameters
+{
+    [NamedParameter(Documentation = "Container Registry Username for Docker Images", DefaultValue = "")]
+    public sealed class ContainerRegistryUsername : Name<string>
+    {
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
index 075ce00..c6deb7b 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/AzureBatchService.cs
@@ -17,6 +17,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading.Tasks;
 using Microsoft.Azure.Batch;
 using Microsoft.Azure.Batch.Common;
@@ -28,7 +29,7 @@
 
 namespace Org.Apache.REEF.Client.DotNet.AzureBatch
 {
-    public sealed class AzureBatchService : IDisposable
+    internal sealed class AzureBatchService : IDisposable
     {
         private static readonly Logger LOGGER = Logger.GetLogger(typeof(AzureBatchService));
         private static readonly TimeSpan RetryDeltaBackOff = TimeSpan.FromSeconds(5);
@@ -36,24 +37,36 @@
         private const int MaxRetries = 3;
 
         public BatchSharedKeyCredential Credentials { get; private set; }
-        public string PoolId { get; private set; }
+        public string PoolId { get; }
 
-        private BatchClient Client { get; set; }
+        private BatchClient Client { get; }
+        private ContainerRegistryProvider ContainerRegistryProvider { get; }
+        private IList<string> Ports { get; }
+        private ICommandBuilder CommandBuilder { get; }
+        private bool AreContainersEnabled => ContainerRegistryProvider.IsValid();
+
         private bool disposed;
 
         [Inject]
         public AzureBatchService(
+            ContainerRegistryProvider containerRegistryProvider,
+            ICommandBuilder commandBuilder,
             [Parameter(typeof(AzureBatchAccountUri))] string azureBatchAccountUri,
             [Parameter(typeof(AzureBatchAccountName))] string azureBatchAccountName,
             [Parameter(typeof(AzureBatchAccountKey))] string azureBatchAccountKey,
-            [Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId)
+            [Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId,
+            [Parameter(typeof(AzureBatchPoolDriverPortsList))] IList<string> ports)
         {
-            BatchSharedKeyCredential credentials = new BatchSharedKeyCredential(azureBatchAccountUri, azureBatchAccountName, azureBatchAccountKey);
+            BatchSharedKeyCredential credentials =
+                new BatchSharedKeyCredential(azureBatchAccountUri, azureBatchAccountName, azureBatchAccountKey);
 
-            this.Client = BatchClient.Open(credentials);
-            this.Credentials = credentials;
-            this.PoolId = azureBatchPoolId;
-            this.Client.CustomBehaviors.Add(new RetryPolicyProvider(new ExponentialRetry(RetryDeltaBackOff, MaxRetries)));
+            Ports = ports;
+            Client = BatchClient.Open(credentials);
+            Credentials = credentials;
+            PoolId = azureBatchPoolId;
+            ContainerRegistryProvider = containerRegistryProvider;
+            Client.CustomBehaviors.Add(new RetryPolicyProvider(new ExponentialRetry(RetryDeltaBackOff, MaxRetries)));
+            CommandBuilder = commandBuilder;
         }
 
         /// <summary>
@@ -61,13 +74,13 @@
         /// </summary>
         public void Dispose()
         {
-            this.Dispose(true);
+            Dispose(true);
             GC.SuppressFinalize(this);
         }
 
         ~AzureBatchService()
         {
-            this.Dispose(false);
+            Dispose(false);
         }
 
         /// <summary>
@@ -75,26 +88,27 @@
         /// </summary>
         private void Dispose(bool disposing)
         {
-            if (this.disposed)
+            if (disposed)
             {
                 return;
             }
 
             if (disposing)
             {
-                this.Client.Dispose();
+                Client.Dispose();
             }
 
-            this.disposed = true;
+            disposed = true;
         }
 
         #region Job related operations
 
         public void CreateJob(string jobId, Uri resourceFile, string commandLine, string storageContainerSAS)
         {
-            CloudJob unboundJob = this.Client.JobOperations.CreateJob();
+            CloudJob unboundJob = Client.JobOperations.CreateJob();
             unboundJob.Id = jobId;
-            unboundJob.PoolInformation = new PoolInformation() { PoolId = this.PoolId };
+            unboundJob.PoolInformation = new PoolInformation() { PoolId = PoolId };
+            unboundJob.JobPreparationTask = CreateJobPreparationTask();
             unboundJob.JobManagerTask = new JobManagerTask()
             {
                 Id = jobId,
@@ -102,26 +116,70 @@
                 RunExclusive = false,
 
                 ResourceFiles = resourceFile != null
-                    ? new List<ResourceFile>() { new ResourceFile(resourceFile.AbsoluteUri, AzureBatchFileNames.GetTaskJarFileName()) }
+                    ? new List<ResourceFile>()
+                    {
+                        new ResourceFile(resourceFile.AbsoluteUri, AzureBatchFileNames.GetTaskJarFileName())
+                    }
                     : new List<ResourceFile>(),
 
-                EnvironmentSettings = new List<EnvironmentSetting> { new EnvironmentSetting(AzureStorageContainerSasToken, storageContainerSAS) },
+                EnvironmentSettings = new List<EnvironmentSetting>
+                {
+                    new EnvironmentSetting(AzureStorageContainerSasToken, storageContainerSAS)
+                },
 
                 // This setting will signal Batch to generate an access token and pass it
                 // to the Job Manager Task (aka the Driver) as an environment variable.
                 // For more info, see
                 // https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.batch.cloudtask.authenticationtokensettings
-                AuthenticationTokenSettings = new AuthenticationTokenSettings() { Access = AccessScope.Job }
+                AuthenticationTokenSettings = new AuthenticationTokenSettings() { Access = AccessScope.Job },
+                ContainerSettings = CreateTaskContainerSettings(jobId),
             };
 
+            if (AreContainersEnabled)
+            {
+                unboundJob.JobManagerTask.UserIdentity =
+                    new UserIdentity(autoUserSpecification: new AutoUserSpecification(elevationLevel: ElevationLevel.Admin));
+            }
+
             unboundJob.Commit();
 
             LOGGER.Log(Level.Info, "Submitted job {0}, commandLine {1} ", jobId, commandLine);
         }
 
+        private JobPreparationTask CreateJobPreparationTask()
+        {
+            if (!AreContainersEnabled)
+            {
+                return null;
+            }
+
+            return new JobPreparationTask()
+            {
+                Id = "CaptureHostIpAddress",
+                CommandLine = CommandBuilder.CaptureIpAddressCommandLine()
+            };
+        }
+
+        private TaskContainerSettings CreateTaskContainerSettings(string dockerContainerId)
+        {
+            if (!AreContainersEnabled)
+            {
+                return null;
+            }
+
+            string portMappings = Ports
+                .Aggregate(seed: string.Empty, func: (aggregator, port) => $"{aggregator} -p {port}:{port}");
+
+            return new TaskContainerSettings(
+                imageName: ContainerRegistryProvider.ContainerImageName,
+                containerRunOptions:
+                    $"-d --rm --name {dockerContainerId} --env HOST_IP_ADDR_PATH={CommandBuilder.GetIpAddressFilePath()} {portMappings}",
+                registry: ContainerRegistryProvider.GetContainerRegistry());
+        }
+
         public CloudJob GetJob(string jobId, DetailLevel detailLevel)
         {
-            using (Task<CloudJob> getJobTask = this.GetJobAsync(jobId, detailLevel))
+            using (Task<CloudJob> getJobTask = GetJobAsync(jobId, detailLevel))
             {
                 getJobTask.Wait();
                 return getJobTask.Result;
@@ -130,20 +188,20 @@
 
         public Task<CloudJob> GetJobAsync(string jobId, DetailLevel detailLevel)
         {
-            return this.Client.JobOperations.GetJobAsync(jobId, detailLevel);
+            return Client.JobOperations.GetJobAsync(jobId, detailLevel);
         }
 
         public CloudTask GetJobManagerTaskFromJobId(string jobId)
         {
-            string driverTaskId = this.Client.JobOperations.GetJob(jobId).JobManagerTask.Id;
-            return this.Client.JobOperations.GetTask(jobId, driverTaskId);
+            string driverTaskId = Client.JobOperations.GetJob(jobId).JobManagerTask.Id;
+            return Client.JobOperations.GetTask(jobId, driverTaskId);
         }
 
         public ComputeNode GetComputeNodeFromNodeId(string nodeId)
         {
-            return this.Client.PoolOperations.GetComputeNode(this.PoolId, nodeId);
+            return Client.PoolOperations.GetComputeNode(PoolId, nodeId);
         }
 
-        #endregion
+        #endregion Job related operations
     }
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/ContainerRegistryProvider.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/ContainerRegistryProvider.cs
new file mode 100644
index 0000000..6d8e3f5
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Service/ContainerRegistryProvider.cs
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Microsoft.Azure.Batch;
+using Org.Apache.REEF.Client.AzureBatch.Parameters;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.Client.DotNet.AzureBatch
+{
+    public sealed class ContainerRegistryProvider
+    {
+        public string ContainerRegistryServer { get; }
+        public string ContainerRegistryUsername { get; }
+        public string ContainerRegistryPassword { get; }
+        public string ContainerImageName { get; }
+
+        [Inject]
+        public ContainerRegistryProvider(
+            [Parameter(typeof(ContainerRegistryServer))] string containerRegistryServer,
+            [Parameter(typeof(ContainerRegistryUsername))] string containerRegistryUsername,
+            [Parameter(typeof(ContainerRegistryPassword))] string containerRegistryPassword,
+            [Parameter(typeof(ContainerImageName))] string containerImageName
+      )
+        {
+            ContainerRegistryServer = containerRegistryServer;
+            ContainerRegistryUsername = containerRegistryUsername;
+            ContainerRegistryPassword = containerRegistryPassword;
+            ContainerImageName = containerImageName;
+        }
+
+        public bool IsValid()
+        {
+            return !string.IsNullOrEmpty(ContainerRegistryServer);
+        }
+
+        public ContainerRegistry GetContainerRegistry()
+        {
+            if (!IsValid())
+            {
+                return null;
+            }
+
+            return new ContainerRegistry(
+                userName: ContainerRegistryUsername,
+                registryServer: ContainerRegistryServer,
+                password: ContainerRegistryPassword);
+        }
+    }
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs
index 4363bd7..83e6762 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/AbstractCommandBuilder.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -63,6 +63,10 @@
             return string.Format(_osCommandFormat, _commandPrefix + sb.ToString());
         }
 
+        public abstract string CaptureIpAddressCommandLine();
+
+        public abstract string GetIpAddressFilePath();
+
         /// <summary>
         /// Returns the driver classpath string which is compatible with the intricacies of the OS.
         /// </summary>
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs
index 23e83d4..9a4f88c 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/ICommandBuilder.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -31,5 +31,17 @@
         /// <param name="driverMemory">The memory in megabytes used by driver.</param>
         /// <returns>The command string.</returns>
         string BuildDriverCommand(int driverMemory);
+
+        /// <summary>
+        /// Returns the path to a file where the ip address is persisted.
+        /// </summary>
+        /// <returns>Path to the file.</returns>
+        string GetIpAddressFilePath();
+
+        /// <summary>
+        /// Returns a command line that saves the ip address of the host to a file.
+        /// </summary>
+        /// <returns>The command line string.</returns>
+        string CaptureIpAddressCommandLine();
     }
-}
\ No newline at end of file
+}
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
index 4b02691..f1cbe42 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/JobJarMaker.cs
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+using System.Collections.Generic;
+using System.IO;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.Avro;
 using Org.Apache.REEF.Client.Avro.AzureBatch;
@@ -23,9 +25,6 @@
 using Org.Apache.REEF.Common.Avro;
 using Org.Apache.REEF.Common.Files;
 using Org.Apache.REEF.Tang.Annotations;
-using System;
-using System.Collections.Generic;
-using System.IO;
 
 namespace Org.Apache.REEF.Client.AzureBatch.Util
 {
@@ -37,7 +36,7 @@
         private readonly REEFFileNames _fileNames;
 
         [Inject]
-        JobJarMaker(
+        private JobJarMaker(
             IResourceArchiveFileGenerator resourceArchiveFileGenerator,
             DriverFolderPreparationHelper driverFolderPreparationHelper,
             REEFFileNames fileNames,
@@ -46,7 +45,11 @@
             [Parameter(typeof(AzureBatchPoolId))] string azureBatchPoolId,
             [Parameter(typeof(AzureStorageAccountName))] string azureStorageAccountName,
             [Parameter(typeof(AzureStorageContainerName))] string azureStorageContainerName,
-            [Parameter(typeof(AzureBatchPoolDriverPortsList))] List<string> azureBatchPoolDriverPortsList)
+            [Parameter(typeof(AzureBatchPoolDriverPortsList))] List<string> azureBatchPoolDriverPortsList,
+            [Parameter(typeof(ContainerRegistryServer))] string containerRegistryServer,
+            [Parameter(typeof(ContainerRegistryUsername))] string containerRegistryUsername,
+            [Parameter(typeof(ContainerRegistryPassword))] string containerRegistryPassword,
+            [Parameter(typeof(ContainerImageName))] string containerImageName)
         {
             _resourceArchiveFileGenerator = resourceArchiveFileGenerator;
             _driverFolderPreparationHelper = driverFolderPreparationHelper;
@@ -59,6 +62,10 @@
                 AzureStorageAccountName = azureStorageAccountName,
                 AzureStorageContainerName = azureStorageContainerName,
                 AzureBatchPoolDriverPortsList = azureBatchPoolDriverPortsList,
+                ContainerRegistryServer = containerRegistryServer,
+                ContainerRegistryUsername = containerRegistryUsername,
+                ContainerRegistryPassword = containerRegistryPassword,
+                ContainerImageName = containerImageName,
             };
         }
 
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs
index eed2163..9780035 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/LinuxCommandBuilder.cs
@@ -25,6 +25,7 @@
     {
         private static readonly string CommandPrefix =
             "unzip " + AzureBatchFileNames.GetTaskJarFileName() + " -d 'reef/'" + ";";
+
         private const string ClassPathSeparator = ":";
         private const string OsCommandFormat = "/bin/sh c \"{0}\"";
 
@@ -40,5 +41,16 @@
         {
             throw new NotImplementedException();
         }
+
+        public override string GetIpAddressFilePath()
+        {
+            return "$AZ_BATCH_JOB_PREP_WORKING_DIR/hostip.txt";
+        }
+
+        public override string CaptureIpAddressCommandLine()
+        {
+            string filePath = GetIpAddressFilePath();
+            return $"/bin/bash -c \"rm -f {filePath}; echo `hostname -i` > {filePath}\"";
+        }
     }
 }
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs
index b7d6d2f..1699d95 100644
--- a/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.Client/AzureBatch/Util/WindowsCommandBuilder.cs
@@ -5,9 +5,9 @@
 // to you under the Apache License, Version 2.0 (the
 // "License"); you may not use this file except in compliance
 // with the License.  You may obtain a copy of the License at
-// 
+//
 //   http://www.apache.org/licenses/LICENSE-2.0
-// 
+//
 // Unless required by applicable law or agreed to in writing,
 // software distributed under the License is distributed on an
 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -27,6 +27,7 @@
           "[System.IO.Compression.ZipFile]::ExtractToDirectory(\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\" +
               AzureBatchFileNames.GetTaskJarFileName() + "\\\", " +
               "\\\"$env:AZ_BATCH_TASK_WORKING_DIR\\reef\\\");";
+
         private const string ClassPathSeparator = ";";
         private const string OsCommandFormat = "powershell.exe /c \"{0}\";";
 
@@ -48,5 +49,15 @@
 
             return string.Format("'{0};'", string.Join(ClassPathSeparator, classpathList));
         }
+
+        public override string GetIpAddressFilePath()
+        {
+            return "%AZ_BATCH_JOB_PREP_WORKING_DIR%\\hostip.txt";
+        }
+
+        public override string CaptureIpAddressCommandLine()
+        {
+            return $"powershell /c \"Set-Content -Path hostip.txt -Value ((Test-Connection -ComputerName $Env:ComputerName -Count 1).IPV4Address.IPAddressToString) -NoNewline -Force\"";
+        }
     }
 }
diff --git a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
index 4cb1c27..a995a7f 100644
--- a/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
+++ b/lang/cs/Org.Apache.REEF.Examples.HelloREEF/HelloREEF.cs
@@ -16,7 +16,6 @@
 // under the License.
 
 using System;
-using System.Collections.Generic;
 using System.Globalization;
 using Org.Apache.REEF.Client.API;
 using Org.Apache.REEF.Client.AzureBatch;
@@ -98,10 +97,13 @@
                     return LocalRuntimeClientConfiguration.ConfigurationModule
                         .Set(LocalRuntimeClientConfiguration.NumberOfEvaluators, "2")
                         .Build();
+
                 case YARN:
                     return YARNClientConfiguration.ConfigurationModule.Build();
+
                 case YARNRest:
                     return YARNClientConfiguration.ConfigurationModuleYARNRest.Build();
+
                 case HDInsight:
                     // To run against HDInsight please replace placeholders below, with actual values for
                     // blob storage account name and key, container name (available at Azure portal) and HDInsight
@@ -117,6 +119,7 @@
                         .Set(AzureBlobFileSystemConfiguration.AccountName, blobStorageAccountName)
                         .Set(AzureBlobFileSystemConfiguration.AccountKey, blobStorageAccountKey)
                         .Build();
+
                 case AzureBatch:
                     return AzureBatchRuntimeClientConfiguration.ConfigurationModule
                         .Set(AzureBatchRuntimeClientConfiguration.AzureBatchAccountKey, @"##########################################")
@@ -128,9 +131,20 @@
                         .Set(AzureBatchRuntimeClientConfiguration.AzureStorageContainerName, @"###########")
                         //// Extend default retry interval in Azure Batch
                         .Set(AzureBatchRuntimeClientConfiguration.DriverHTTPConnectionRetryInterval, "20000")
-                        //// To allow Driver - Client communication, please specify the ports to use to set up driver http server.
-                        //// These ports must be defined in Azure Batch InBoundNATPool.
-                        .Set(AzureBatchRuntimeClientConfiguration.AzureBatchPoolDriverPortsList, new List<string>(new string[] { "123", "456" }))
+                        //// Following list of ports is required to enable the following options:
+                        //// 1. To enable communication between driver and client:
+                        ////        The ports will be used to set up driver http server endpoint.
+                        ////        In addition, these ports must also be defined in Azure Batch InBoundNATPool to enable communication.
+                        //// 2. To enable communication between docker containers:
+                        ////        These ports will be mapped between the container and host to
+                        ////        allow the communication between the containers.
+                        ////        In addition, the Azure Batch pool must be created as a container pool to use this feature.
+                        // .Set(AzureBatchRuntimeClientConfiguration.AzureBatchPoolDriverPortsList, new List<string> { "2000", "2001", "2002" })
+                        //// Bind to Container Registry properties if present
+                        // .Set(AzureBatchRuntimeClientConfiguration.ContainerRegistryServer, @"###############")
+                        // .Set(AzureBatchRuntimeClientConfiguration.ContainerRegistryUsername, @"###############")
+                        // .Set(AzureBatchRuntimeClientConfiguration.ContainerRegistryPassword, @"###############")
+                        // .Set(AzureBatchRuntimeClientConfiguration.ContainerImageName, @"###############")
                         .Build();
 
                 default:
diff --git a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
index 92f215f..658e4d8 100644
--- a/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
+++ b/lang/java/reef-bridge-client/src/main/avro/JobSubmissionParameters.avsc
@@ -78,7 +78,11 @@
         { "name": "AzureBatchPoolId", "type": "string" },
         { "name": "AzureStorageAccountName", "type": "string" },
         { "name": "AzureStorageContainerName", "type": "string" },
-        { "name": "AzureBatchPoolDriverPortsList", "type": {"type": "array", "items": "string"}}
+        { "name": "AzureBatchPoolDriverPortsList", "type": {"type": "array", "items": "string"}},
+        { "name": "ContainerRegistryServer", "type": "string" },
+        { "name": "ContainerRegistryUsername", "type": "string" },
+        { "name": "ContainerRegistryPassword", "type": "string" },
+        { "name": "ContainerImageName", "type": "string" }
       ]
   }
 ]
diff --git a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
index 69e39f3..35a2e86 100644
--- a/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
+++ b/lang/java/reef-bridge-client/src/main/java/org/apache/reef/bridge/client/AzureBatchBootstrapREEFLauncher.java
@@ -26,11 +26,7 @@
 import org.apache.reef.runtime.azbatch.AzureBatchClasspathProvider;
 import org.apache.reef.runtime.azbatch.AzureBatchJVMPathProvider;
 import org.apache.reef.runtime.azbatch.client.AzureBatchDriverConfigurationProviderImpl;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
+import org.apache.reef.runtime.azbatch.parameters.*;
 import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
 import org.apache.reef.runtime.azbatch.util.command.WindowsCommandBuilder;
 import org.apache.reef.runtime.common.REEFEnvironment;
@@ -43,17 +39,13 @@
 import org.apache.reef.tang.*;
 import org.apache.reef.tang.exceptions.InjectionException;
 import org.apache.reef.wake.remote.RemoteConfiguration;
-import org.apache.reef.wake.remote.ports.ListTcpPortProvider;
-import org.apache.reef.wake.remote.ports.TcpPortProvider;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 import org.apache.reef.wake.time.Clock;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -95,20 +87,24 @@
         TANG.newInjector(generateConfiguration(jobSubmissionParameters))
             .getInstance(AzureBatchBootstrapDriverConfigGenerator.class);
 
+    final LocalAddressProvider defaultLocalAddressProvider =
+        Tang.Factory.getTang().newInjector().getInstance(LocalAddressProvider.class);
+
     final JavaConfigurationBuilder launcherConfigBuilder =
         TANG.newConfigurationBuilder()
             .bindNamedParameter(RemoteConfiguration.ManagerName.class, "AzureBatchBootstrapREEFLauncher")
             .bindNamedParameter(RemoteConfiguration.ErrorHandler.class, REEFErrorHandler.class)
             .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
+            .bindNamedParameter(RemoteConfiguration.HostAddress.class, defaultLocalAddressProvider.getLocalAddress())
             .bindSetEntry(Clock.RuntimeStartHandler.class, PIDStoreStartHandler.class);
 
+
     // Check if user has set up preferred ports to use.
     // If set, we prefer will launch driver that binds those ports.
-    final List<String> preferredPorts = asStringList(jobSubmissionParameters.getAzureBatchPoolDriverPortsList());
-
-    if (preferredPorts.size() > 0) {
-      launcherConfigBuilder.bindList(TcpPortList.class, preferredPorts)
-          .bindImplementation(TcpPortProvider.class, ListTcpPortProvider.class);
+    if (jobSubmissionParameters.getAzureBatchPoolDriverPortsList().size() > 0) {
+      for (CharSequence port : jobSubmissionParameters.getAzureBatchPoolDriverPortsList()) {
+        launcherConfigBuilder.bindSetEntry(TcpPortSet.class, port.toString());
+      }
     }
 
     final Configuration launcherConfig = launcherConfigBuilder.build();
@@ -156,17 +152,17 @@
             avroAzureBatchJobSubmissionParameters.getAzureStorageAccountName().toString())
         .bindNamedParameter(AzureStorageContainerName.class,
             avroAzureBatchJobSubmissionParameters.getAzureStorageContainerName().toString())
+        .bindNamedParameter(ContainerRegistryServer.class,
+            avroAzureBatchJobSubmissionParameters.getContainerRegistryServer().toString())
+        .bindNamedParameter(ContainerRegistryUsername.class,
+            avroAzureBatchJobSubmissionParameters.getContainerRegistryUsername().toString())
+        .bindNamedParameter(ContainerRegistryPassword.class,
+            avroAzureBatchJobSubmissionParameters.getContainerRegistryPassword().toString())
+        .bindNamedParameter(ContainerImageName.class,
+            avroAzureBatchJobSubmissionParameters.getContainerImageName().toString())
         .build();
   }
 
-  private static List<String> asStringList(final Collection<? extends CharSequence> list) {
-    final List<String> result = new ArrayList<>(list.size());
-    for (final CharSequence sequence : list) {
-      result.add(sequence.toString());
-    }
-    return result;
-  }
-
   private static RuntimeException fatal(final String msg, final Throwable t) {
     LOG.log(Level.SEVERE, msg, t);
     return new RuntimeException(msg, t);
diff --git a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
index d52644c..68b75f1 100644
--- a/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
+++ b/lang/java/reef-io/src/main/java/org/apache/reef/io/network/naming/NameServerImpl.java
@@ -33,6 +33,7 @@
 import org.apache.reef.wake.remote.RemoteConfiguration;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.impl.TransportEvent;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
 import org.apache.reef.webserver.ReefEventStateManager;
@@ -48,6 +49,7 @@
  */
 public final class NameServerImpl implements NameServer {
 
+  private static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
   private static final Logger LOG = Logger.getLogger(NameServer.class.getName());
 
   private final Transport transport;
@@ -64,8 +66,10 @@
    */
   @Inject
   private NameServerImpl(
+      @Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
       @Parameter(NameServerParameters.NameServerPort.class) final int port,
       @Parameter(NameServerParameters.NameServerIdentifierFactory.class) final IdentifierFactory factory,
+      final TcpPortProvider portProvider,
       final LocalAddressProvider localAddressProvider) {
 
     final Injector injector = Tang.Factory.getTang().newInjector();
@@ -75,8 +79,10 @@
     final Codec<NamingMessage> codec = NamingCodecFactory.createFullCodec(factory);
     final EventHandler<NamingMessage> handler = createEventHandler(codec);
 
-    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, localAddressProvider.getLocalAddress());
+    String host = UNKNOWN_HOST_NAME.equals(hostAddress) ? localAddressProvider.getLocalAddress() : hostAddress;
+    injector.bindVolatileParameter(RemoteConfiguration.HostAddress.class, host);
     injector.bindVolatileParameter(RemoteConfiguration.Port.class, port);
+    injector.bindVolatileInstance(TcpPortProvider.class, portProvider);
     injector.bindVolatileParameter(RemoteConfiguration.RemoteServerStage.class,
         new SyncStage<>(new NamingServerHandler(handler, codec)));
 
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
index 0887e9d..adab2cb 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchDriverConfigurationProviderImpl.java
@@ -21,20 +21,25 @@
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.runtime.azbatch.driver.AzureBatchDriverConfiguration;
 import org.apache.reef.runtime.azbatch.driver.RuntimeIdentifier;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.parameters.*;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.batch.ContainerRegistryProvider;
 import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
 import org.apache.reef.runtime.common.client.DriverConfigurationProvider;
 import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.Configurations;
 import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.remote.address.ContainerBasedLocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.SetTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 import javax.inject.Inject;
 import java.net.URI;
+import java.util.Set;
 
 /**
  * Configuration provider for the Azure Batch runtime.
@@ -48,7 +53,9 @@
   private final String azureBatchPoolId;
   private final String azureStorageAccountName;
   private final String azureStorageContainerName;
+  private final ContainerRegistryProvider containerRegistryProvider;
   private final CommandBuilder commandBuilder;
+  private final Set<String> tcpPortSet;
 
   @Inject
   private AzureBatchDriverConfigurationProviderImpl(
@@ -58,6 +65,8 @@
       @Parameter(AzureBatchPoolId.class) final String azureBatchPoolId,
       @Parameter(AzureStorageAccountName.class) final String azureStorageAccountName,
       @Parameter(AzureStorageContainerName.class) final String azureStorageContainerName,
+      @Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet,
+      final ContainerRegistryProvider containerRegistryProvider,
       final CommandBuilder commandBuilder) {
     this.jvmSlack = jvmSlack;
     this.azureBatchAccountUri = azureBatchAccountUri;
@@ -65,7 +74,11 @@
     this.azureBatchPoolId = azureBatchPoolId;
     this.azureStorageAccountName = azureStorageAccountName;
     this.azureStorageContainerName = azureStorageContainerName;
+    this.containerRegistryProvider = containerRegistryProvider;
     this.commandBuilder = commandBuilder;
+
+    // Binding a parameter to a set is only allowed for strings, so we cast to strings.
+    this.tcpPortSet = AzureBatchHelper.toStringSet(tcpPortSet);
   }
 
   /**
@@ -82,19 +95,36 @@
                                               final String clientRemoteId,
                                               final String jobId,
                                               final Configuration applicationConfiguration) {
-    return Configurations.merge(
-        AzureBatchDriverConfiguration.CONF.getBuilder()
-            .bindImplementation(CommandBuilder.class, this.commandBuilder.getClass()).build()
-            .set(AzureBatchDriverConfiguration.JOB_IDENTIFIER, jobId)
-            .set(AzureBatchDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
-            .set(AzureBatchDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
-            .set(AzureBatchDriverConfiguration.RUNTIME_NAME, RuntimeIdentifier.RUNTIME_NAME)
-            .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_URI, this.azureBatchAccountUri)
-            .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_NAME, this.azureBatchAccountName)
-            .set(AzureBatchDriverConfiguration.AZURE_BATCH_POOL_ID, this.azureBatchPoolId)
-            .set(AzureBatchDriverConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
-            .set(AzureBatchDriverConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
-            .build(),
-        applicationConfiguration);
+    ConfigurationModuleBuilder driverConfigurationBuilder = AzureBatchDriverConfiguration.CONF.getBuilder()
+        .bindImplementation(CommandBuilder.class, this.commandBuilder.getClass());
+
+    // If using docker containers, then use a different set of bindings
+    if (this.containerRegistryProvider.isValid()) {
+      driverConfigurationBuilder = driverConfigurationBuilder
+          .bindImplementation(LocalAddressProvider.class, ContainerBasedLocalAddressProvider.class)
+          .bindImplementation(TcpPortProvider.class, SetTcpPortProvider.class);
+    }
+
+    final Configuration driverConfiguration = driverConfigurationBuilder.build()
+        .set(AzureBatchDriverConfiguration.JOB_IDENTIFIER, jobId)
+        .set(AzureBatchDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, clientRemoteId)
+        .set(AzureBatchDriverConfiguration.JVM_HEAP_SLACK, this.jvmSlack)
+        .set(AzureBatchDriverConfiguration.RUNTIME_NAME, RuntimeIdentifier.RUNTIME_NAME)
+        .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_URI, this.azureBatchAccountUri)
+        .set(AzureBatchDriverConfiguration.AZURE_BATCH_ACCOUNT_NAME, this.azureBatchAccountName)
+        .set(AzureBatchDriverConfiguration.AZURE_BATCH_POOL_ID, this.azureBatchPoolId)
+        .set(AzureBatchDriverConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
+        .set(AzureBatchDriverConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
+        .set(AzureBatchDriverConfiguration.CONTAINER_REGISTRY_SERVER,
+            this.containerRegistryProvider.getContainerRegistryServer())
+        .set(AzureBatchDriverConfiguration.CONTAINER_REGISTRY_USERNAME,
+            this.containerRegistryProvider.getContainerRegistryUsername())
+        .set(AzureBatchDriverConfiguration.CONTAINER_REGISTRY_PASSWORD,
+            this.containerRegistryProvider.getContainerRegistryPassword())
+        .set(AzureBatchDriverConfiguration.CONTAINER_IMAGE_NAME,
+            this.containerRegistryProvider.getContainerImageName())
+        .setMultiple(AzureBatchDriverConfiguration.TCP_PORT_SET, this.tcpPortSet)
+        .build();
+    return Configurations.merge(driverConfiguration, applicationConfiguration);
   }
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
index cbb8a8c..95e2d24 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchJobSubmissionHandler.java
@@ -93,7 +93,7 @@
    */
   @Override
   public void close() throws Exception {
-    LOG.log(Level.INFO, "Closing " + AzureBatchJobSubmissionHandler.class.getName());
+    LOG.log(Level.INFO, "Closing {0}", AzureBatchJobSubmissionHandler.class.getName());
   }
 
   /**
@@ -147,7 +147,7 @@
 
   private String createApplicationId(final JobSubmissionEvent jobSubmissionEvent) {
     String uuid = UUID.randomUUID().toString();
-    String jobIdentifier  = jobSubmissionEvent.getIdentifier();
+    String jobIdentifier = jobSubmissionEvent.getIdentifier();
     String jobNameShort = jobIdentifier.length() + 1 + uuid.length() < MAX_CHARS_JOB_NAME ?
         jobIdentifier : jobIdentifier.substring(0, MAX_CHARS_JOB_NAME - uuid.length() - 1);
     return jobNameShort + "-" + uuid;
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
index 541f8c4..7489211 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfiguration.java
@@ -23,6 +23,7 @@
 import org.apache.reef.tang.formats.AvroConfigurationSerializer;
 
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.tang.formats.RequiredParameter;
 
 import java.io.File;
@@ -75,6 +76,31 @@
   public static final RequiredParameter<String> AZURE_STORAGE_CONTAINER_NAME = new RequiredParameter<>();
 
   /**
+   * Container Registry server.
+   */
+  public static final OptionalParameter<String> CONTAINER_REGISTRY_SERVER = new OptionalParameter<>();
+
+  /**
+   * Container Registry username.
+   */
+  public static final OptionalParameter<String> CONTAINER_REGISTRY_USERNAME = new OptionalParameter<>();
+
+  /**
+   * Container Registry password.
+   */
+  public static final OptionalParameter<String> CONTAINER_REGISTRY_PASSWORD = new OptionalParameter<>();
+
+  /**
+   * Container Image name.
+   */
+  public static final OptionalParameter<String> CONTAINER_IMAGE_NAME = new OptionalParameter<>();
+
+  /**
+   * Set of tcp ports.
+   */
+  public static final OptionalParameter<Integer> TCP_PORT_SET = new OptionalParameter<>();
+
+  /**
    * Create a {@link Configuration} object from an Avro configuration file.
    *
    * @param file the configuration file.
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
index 0d8860b..aa6e6d3 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationCreator.java
@@ -19,18 +19,13 @@
 package org.apache.reef.runtime.azbatch.client;
 
 import org.apache.reef.annotations.audience.Private;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountKey;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchAccountUri;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.parameters.*;
 import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
 import org.apache.reef.runtime.azbatch.util.command.LinuxCommandBuilder;
 import org.apache.reef.runtime.azbatch.util.command.WindowsCommandBuilder;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountKey;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageAccountName;
-import org.apache.reef.runtime.azbatch.parameters.AzureStorageContainerName;
 import org.apache.reef.tang.formats.ConfigurationModule;
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 /**
  * Class that builds the ConfigurationModule for Azure Batch runtime.
@@ -49,27 +44,35 @@
    * @param isWindows true if Azure Batch pool nodes run Windows, false otherwise.
    * @return the configuration module object.
    */
-  public static ConfigurationModule getOrCreateAzureBatchRuntimeConfiguration(final boolean isWindows) {
+  public static ConfigurationModule getOrCreateAzureBatchRuntimeConfiguration(
+      final boolean isWindows) {
 
     if (AzureBatchRuntimeConfigurationCreator.conf == null) {
       ConfigurationModuleBuilder builder = AzureBatchRuntimeConfigurationStatic.CONF;
-      ConfigurationModule module;
+
       if (isWindows) {
-        module = builder.bindImplementation(CommandBuilder.class, WindowsCommandBuilder.class).build();
+        builder = builder.bindImplementation(CommandBuilder.class, WindowsCommandBuilder.class);
       } else {
-        module = builder.bindImplementation(CommandBuilder.class, LinuxCommandBuilder.class).build();
+        builder = builder.bindImplementation(CommandBuilder.class, LinuxCommandBuilder.class);
       }
 
       AzureBatchRuntimeConfigurationCreator.conf = new AzureBatchRuntimeConfiguration()
-          .merge(module)
+          .merge(builder.build())
           .bindNamedParameter(AzureBatchAccountName.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_NAME)
           .bindNamedParameter(AzureBatchAccountUri.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_URI)
           .bindNamedParameter(AzureBatchAccountKey.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_ACCOUNT_KEY)
           .bindNamedParameter(AzureBatchPoolId.class, AzureBatchRuntimeConfiguration.AZURE_BATCH_POOL_ID)
           .bindNamedParameter(AzureStorageAccountName.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_NAME)
           .bindNamedParameter(AzureStorageAccountKey.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_KEY)
+          .bindNamedParameter(ContainerRegistryServer.class, AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_SERVER)
+          .bindNamedParameter(
+              ContainerRegistryUsername.class, AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_USERNAME)
+          .bindNamedParameter(
+              ContainerRegistryPassword.class, AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_PASSWORD)
+          .bindNamedParameter(ContainerImageName.class, AzureBatchRuntimeConfiguration.CONTAINER_IMAGE_NAME)
           .bindNamedParameter(
               AzureStorageContainerName.class, AzureBatchRuntimeConfiguration.AZURE_STORAGE_CONTAINER_NAME)
+          .bindSetEntry(TcpPortSet.class, AzureBatchRuntimeConfiguration.TCP_PORT_SET)
           .build();
     }
 
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
index c78e9e3..ce62be4 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/client/AzureBatchRuntimeConfigurationProvider.java
@@ -20,17 +20,20 @@
 
 import org.apache.reef.annotations.audience.Public;
 import org.apache.reef.runtime.azbatch.parameters.*;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.batch.ContainerRegistryProvider;
 import org.apache.reef.tang.Configuration;
 import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 import javax.inject.Inject;
+import java.util.Set;
 
 /**
  * Class that provides the runtime configuration for Azure Batch.
  */
 @Public
 public final class AzureBatchRuntimeConfigurationProvider {
-
   private final String azureBatchAccountName;
   private final String azureBatchAccountKey;
   private final String azureBatchAccountUri;
@@ -38,7 +41,9 @@
   private final String azureStorageAccountName;
   private final String azureStorageAccountKey;
   private final String azureStorageContainerName;
+  private final ContainerRegistryProvider containerRegistryProvider;
   private final Boolean isWindows;
+  private final Set<String> tcpPortSet;
 
   /**
    * Private constructor.
@@ -52,7 +57,9 @@
       @Parameter(AzureStorageAccountName.class) final String azureStorageAccountName,
       @Parameter(AzureStorageAccountKey.class) final String azureStorageAccountKey,
       @Parameter(AzureStorageContainerName.class) final String azureStorageContainerName,
-      @Parameter(IsWindows.class) final Boolean isWindows) {
+      @Parameter(IsWindows.class) final Boolean isWindows,
+      @Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet,
+      final ContainerRegistryProvider containerRegistryProvider) {
     this.azureBatchAccountName = azureBatchAccountName;
     this.azureBatchAccountKey = azureBatchAccountKey;
     this.azureBatchAccountUri = azureBatchAccountUri;
@@ -60,7 +67,11 @@
     this.azureStorageAccountName = azureStorageAccountName;
     this.azureStorageAccountKey = azureStorageAccountKey;
     this.azureStorageContainerName = azureStorageContainerName;
+    this.containerRegistryProvider = containerRegistryProvider;
     this.isWindows = isWindows;
+
+    // Binding a parameter to a set is only allowed for strings, so we cast to strings.
+    this.tcpPortSet = AzureBatchHelper.toStringSet(tcpPortSet);
   }
 
   public Configuration getAzureBatchRuntimeConfiguration() {
@@ -73,6 +84,15 @@
         .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_NAME, this.azureStorageAccountName)
         .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_ACCOUNT_KEY, this.azureStorageAccountKey)
         .set(AzureBatchRuntimeConfiguration.AZURE_STORAGE_CONTAINER_NAME, this.azureStorageContainerName)
+        .set(AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_SERVER,
+            this.containerRegistryProvider.getContainerRegistryServer())
+        .set(AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_USERNAME,
+            this.containerRegistryProvider.getContainerRegistryUsername())
+        .set(AzureBatchRuntimeConfiguration.CONTAINER_REGISTRY_PASSWORD,
+            this.containerRegistryProvider.getContainerRegistryPassword())
+        .set(AzureBatchRuntimeConfiguration.CONTAINER_IMAGE_NAME,
+            this.containerRegistryProvider.getContainerImageName())
+        .setMultiple(AzureBatchRuntimeConfiguration.TCP_PORT_SET, this.tcpPortSet)
         .build();
   }
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
index 9a096be..b8b8f18 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchDriverConfiguration.java
@@ -41,6 +41,7 @@
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
 import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.tang.formats.RequiredParameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 /**
  * ConfigurationModule to create Azure Batch Driver configurations.
@@ -95,6 +96,31 @@
   public static final RequiredParameter<String> AZURE_STORAGE_CONTAINER_NAME = new RequiredParameter<>();
 
   /**
+   * Container Registry Server.
+   */
+  public static final OptionalParameter<String> CONTAINER_REGISTRY_SERVER = new OptionalParameter<>();
+
+  /**
+   * Container Registry Username.
+   */
+  public static final OptionalParameter<String> CONTAINER_REGISTRY_USERNAME = new OptionalParameter<>();
+
+  /**
+   * Container Registry password.
+   */
+  public static final OptionalParameter<String> CONTAINER_REGISTRY_PASSWORD = new OptionalParameter<>();
+
+  /**
+   * Container Image name.
+   */
+  public static final OptionalParameter<String> CONTAINER_IMAGE_NAME = new OptionalParameter<>();
+
+  /**
+   * Set of TCP Ports.
+   */
+  public static final OptionalParameter<Integer> TCP_PORT_SET = new OptionalParameter<>();
+
+  /**
    * The fraction of the container memory NOT to use for the Java Heap.
    */
   public static final OptionalParameter<Double> JVM_HEAP_SLACK = new OptionalParameter<>();
@@ -115,6 +141,12 @@
       .bindNamedParameter(AzureStorageAccountName.class, AZURE_STORAGE_ACCOUNT_NAME)
       .bindNamedParameter(AzureStorageContainerName.class, AZURE_STORAGE_CONTAINER_NAME)
 
+      // Bind Azure Container Parameters
+      .bindNamedParameter(ContainerRegistryServer.class, CONTAINER_REGISTRY_SERVER)
+      .bindNamedParameter(ContainerRegistryUsername.class, CONTAINER_REGISTRY_USERNAME)
+      .bindNamedParameter(ContainerRegistryPassword.class, CONTAINER_REGISTRY_PASSWORD)
+      .bindNamedParameter(ContainerImageName.class, CONTAINER_IMAGE_NAME)
+
       // Bind the fields bound in AbstractDriverRuntimeConfiguration
       .bindNamedParameter(JobIdentifier.class, JOB_IDENTIFIER)
       .bindNamedParameter(LaunchID.class, JOB_IDENTIFIER)
@@ -125,5 +157,6 @@
       .bindImplementation(RuntimeClasspathProvider.class, AzureBatchClasspathProvider.class)
       .bindImplementation(RuntimePathProvider.class, AzureBatchJVMPathProvider.class)
       .bindSetEntry(DefinedRuntimes.class, RUNTIME_NAME)
+      .bindSetEntry(TcpPortSet.class, TCP_PORT_SET)
       .build();
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
index 85b9103..e3d8b1b 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/driver/AzureBatchEvaluatorShimConfigurationProvider.java
@@ -20,10 +20,15 @@
 
 import org.apache.reef.annotations.audience.Private;
 import org.apache.reef.runtime.azbatch.evaluator.EvaluatorShimConfiguration;
+import org.apache.reef.runtime.azbatch.util.batch.AzureBatchHelper;
+import org.apache.reef.runtime.azbatch.util.batch.ContainerRegistryProvider;
 import org.apache.reef.runtime.common.utils.RemoteManager;
 import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 import javax.inject.Inject;
+import java.util.Set;
 
 /**
  * Configuration provider for the Azure Batch evaluator shim.
@@ -32,10 +37,18 @@
 public class AzureBatchEvaluatorShimConfigurationProvider {
 
   private final RemoteManager remoteManager;
+  private final Set<String> tcpPortSet;
+  private final ContainerRegistryProvider containerRegistryProvider;
 
   @Inject
-  AzureBatchEvaluatorShimConfigurationProvider(final RemoteManager remoteManager) {
+  AzureBatchEvaluatorShimConfigurationProvider(
+      @Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet,
+      final ContainerRegistryProvider containerRegistryProvider,
+      final RemoteManager remoteManager) {
     this.remoteManager = remoteManager;
+    this.containerRegistryProvider = containerRegistryProvider;
+    // Binding a parameter to a set is only allowed for strings, so we cast to strings.
+    this.tcpPortSet = AzureBatchHelper.toStringSet(tcpPortSet);
   }
 
   /**
@@ -46,9 +59,13 @@
    * @return A {@link Configuration} object needed to launch the evaluator shim.
    */
   public Configuration getConfiguration(final String containerId) {
-    return EvaluatorShimConfiguration.CONF
+
+    return EvaluatorShimConfiguration
+        .getConfigurationModule(this.containerRegistryProvider.isValid())
         .set(EvaluatorShimConfiguration.DRIVER_REMOTE_IDENTIFIER, this.remoteManager.getMyIdentifier())
         .set(EvaluatorShimConfiguration.CONTAINER_IDENTIFIER, containerId)
+        .setMultiple(EvaluatorShimConfiguration.TCP_PORT_SET, this.tcpPortSet)
         .build();
   }
 }
+
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
index c114ba2..be6428e 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/evaluator/EvaluatorShimConfiguration.java
@@ -25,8 +25,14 @@
 import org.apache.reef.runtime.common.launch.REEFMessageCodec;
 import org.apache.reef.tang.formats.ConfigurationModule;
 import org.apache.reef.tang.formats.ConfigurationModuleBuilder;
+import org.apache.reef.tang.formats.OptionalParameter;
 import org.apache.reef.tang.formats.RequiredParameter;
 import org.apache.reef.wake.remote.RemoteConfiguration;
+import org.apache.reef.wake.remote.address.ContainerBasedLocalAddressProvider;
+import org.apache.reef.wake.remote.address.LocalAddressProvider;
+import org.apache.reef.wake.remote.ports.SetTcpPortProvider;
+import org.apache.reef.wake.remote.ports.TcpPortProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 /**
  * ConfigurationModule to create evaluator shim configurations.
@@ -45,9 +51,28 @@
    */
   public static final RequiredParameter<String> CONTAINER_IDENTIFIER = new RequiredParameter<>();
 
-  public static final ConfigurationModule CONF = new EvaluatorShimConfiguration()
+  /**
+   * Set of TCP Ports.
+   */
+  public static final OptionalParameter<Integer> TCP_PORT_SET = new OptionalParameter<>();
+
+  private static final ConfigurationModule CONF = new EvaluatorShimConfiguration()
       .bindNamedParameter(RemoteConfiguration.MessageCodec.class, REEFMessageCodec.class)
       .bindNamedParameter(DriverRemoteIdentifier.class, DRIVER_REMOTE_IDENTIFIER)
       .bindNamedParameter(ContainerIdentifier.class, CONTAINER_IDENTIFIER)
+      .bindSetEntry(TcpPortSet.class, TCP_PORT_SET)
       .build();
+
+  public static ConfigurationModule getConfigurationModule(final boolean includeContainerConfiguration) {
+    ConfigurationModuleBuilder shimConfigurationBuilder = EvaluatorShimConfiguration.CONF.getBuilder();
+
+    // If using docker containers, then use a different set of bindings
+    if (includeContainerConfiguration) {
+      shimConfigurationBuilder = shimConfigurationBuilder
+          .bindImplementation(LocalAddressProvider.class, ContainerBasedLocalAddressProvider.class)
+          .bindImplementation(TcpPortProvider.class, SetTcpPortProvider.class);
+    }
+
+    return shimConfigurationBuilder.build();
+  }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerImageName.java
similarity index 71%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerImageName.java
index c7f4fbf..3d9ad07 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerImageName.java
@@ -16,22 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
 
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 
-import java.util.List;
-
 /**
- * An list of tcp port numbers to try.
+ * The container registry image name.
  */
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private TcpPortList() {
-  }
+@NamedParameter(doc = "The container image name.", default_value = "")
+public final class ContainerImageName implements Name<String> {
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryPassword.java
similarity index 70%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryPassword.java
index c7f4fbf..acff823 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryPassword.java
@@ -16,22 +16,17 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
 
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 
-import java.util.List;
+// TODO[JIRA REEF-TBD]: Secure the password by encrypting it and using
+// Azure Batch certificates to decrypt on the driver side.
 
 /**
- * An list of tcp port numbers to try.
+ * The container registry password.
  */
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private TcpPortList() {
-  }
+@NamedParameter(doc = "The container registry password.", default_value = "")
+public final class ContainerRegistryPassword implements Name<String> {
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryServer.java
similarity index 71%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryServer.java
index c7f4fbf..a001869 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryServer.java
@@ -16,22 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
 
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 
-import java.util.List;
-
 /**
- * An list of tcp port numbers to try.
+ * The container registry server name.
  */
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private TcpPortList() {
-  }
+@NamedParameter(doc = "The container registry server name.", default_value = "")
+public final class ContainerRegistryServer implements Name<String> {
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryUsername.java
similarity index 71%
copy from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
copy to lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryUsername.java
index c7f4fbf..fe6b82e 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/parameters/ContainerRegistryUsername.java
@@ -16,22 +16,14 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.reef.wake.remote.ports.parameters;
+package org.apache.reef.runtime.azbatch.parameters;
 
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
 
-import java.util.List;
-
 /**
- * An list of tcp port numbers to try.
+ * The container registry user name.
  */
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
-
-  /**
-   * Empty private constructor to prohibit instantiation of utility class.
-   */
-  private TcpPortList() {
-  }
+@NamedParameter(doc = "The container registry user name.", default_value = "")
+public final class ContainerRegistryUsername implements Name<String> {
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
index e0753df..f6e59ef 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/AzureBatchHelper.java
@@ -21,18 +21,18 @@
 import com.microsoft.azure.batch.BatchClient;
 import com.microsoft.azure.batch.protocol.models.*;
 
-import org.apache.reef.runtime.azbatch.client.AzureBatchJobSubmissionHandler;
-import org.apache.reef.runtime.azbatch.parameters.AzureBatchPoolId;
+import org.apache.reef.runtime.azbatch.parameters.*;
 import org.apache.reef.runtime.azbatch.util.AzureBatchFileNames;
+import org.apache.reef.runtime.azbatch.util.command.CommandBuilder;
 import org.apache.reef.runtime.azbatch.util.storage.SharedAccessSignatureCloudBlobClientProvider;
 import org.apache.reef.tang.annotations.Parameter;
+import org.apache.reef.wake.remote.address.ContainerBasedLocalAddressProvider;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 import javax.inject.Inject;
 import java.io.IOException;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
+import java.util.*;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -41,27 +41,39 @@
  */
 public final class AzureBatchHelper {
 
-  private static final Logger LOG = Logger.getLogger(AzureBatchJobSubmissionHandler.class.getName());
+  private static final Logger LOG = Logger.getLogger(AzureBatchHelper.class.getName());
 
   /*
    * Environment variable that contains the Azure Batch jobId.
    */
   private static final String AZ_BATCH_JOB_ID_ENV = "AZ_BATCH_JOB_ID";
+  private static final String CAPTURE_HOST_IP_ADDRESS_TASK_NAME = "CaptureHostIpAddress";
 
   private final AzureBatchFileNames azureBatchFileNames;
 
   private final BatchClient client;
   private final PoolInformation poolInfo;
+  private final CommandBuilder commandBuilder;
+  private final ContainerRegistryProvider containerRegistryProvider;
+  private final boolean areContainersEnabled;
+  private final Set<Integer> ports;
 
   @Inject
   public AzureBatchHelper(
       final AzureBatchFileNames azureBatchFileNames,
       final IAzureBatchCredentialProvider credentialProvider,
+      final CommandBuilder commandBuilder,
+      final ContainerRegistryProvider containerRegistryProvider,
+      @Parameter(TcpPortSet.class) final Set<Integer> ports,
       @Parameter(AzureBatchPoolId.class) final String azureBatchPoolId) {
     this.azureBatchFileNames = azureBatchFileNames;
 
     this.client = BatchClient.open(credentialProvider.getCredentials());
     this.poolInfo = new PoolInformation().withPoolId(azureBatchPoolId);
+    this.commandBuilder = commandBuilder;
+    this.containerRegistryProvider = containerRegistryProvider;
+    this.ports = ports;
+    this.areContainersEnabled = this.containerRegistryProvider.isValid();
   }
 
   /**
@@ -75,7 +87,7 @@
    */
   public void submitJob(final String applicationId, final String storageContainerSAS, final URI jobJarUri,
                         final String command) throws IOException {
-    ResourceFile jarResourceFile = new ResourceFile()
+    final ResourceFile jarResourceFile = new ResourceFile()
         .withBlobSource(jobJarUri.toString())
         .withFilePath(AzureBatchFileNames.getTaskJarFileName());
 
@@ -83,26 +95,29 @@
     // as an environment variable.
     // See https://docs.microsoft.com/en-us/dotnet/api/microsoft.azure.batch.cloudtask.authenticationtokensettings
     // for more info.
-    AuthenticationTokenSettings authenticationTokenSettings = new AuthenticationTokenSettings();
-    authenticationTokenSettings.withAccess(Collections.singletonList(AccessScope.JOB));
+    final AuthenticationTokenSettings authenticationTokenSettings = new AuthenticationTokenSettings()
+        .withAccess(Collections.singletonList(AccessScope.JOB));
 
-    EnvironmentSetting environmentSetting = new EnvironmentSetting()
+    final EnvironmentSetting environmentSetting = new EnvironmentSetting()
         .withName(SharedAccessSignatureCloudBlobClientProvider.AZURE_STORAGE_CONTAINER_SAS_TOKEN_ENV)
         .withValue(storageContainerSAS);
 
-    JobManagerTask jobManagerTask = new JobManagerTask()
+    final JobManagerTask jobManagerTask = new JobManagerTask()
         .withRunExclusive(false)
         .withId(applicationId)
         .withResourceFiles(Collections.singletonList(jarResourceFile))
         .withEnvironmentSettings(Collections.singletonList(environmentSetting))
         .withAuthenticationTokenSettings(authenticationTokenSettings)
+        .withKillJobOnCompletion(true)
+        .withContainerSettings(createTaskContainerSettings(applicationId))
         .withCommandLine(command);
 
-    LOG.log(Level.INFO, "Job Manager (aka driver) task command: " + command);
+    LOG.log(Level.INFO, "Job Manager (aka driver) task command: {0}", command);
 
-    JobAddParameter jobAddParameter = new JobAddParameter()
+    final JobAddParameter jobAddParameter = new JobAddParameter()
         .withId(applicationId)
         .withJobManagerTask(jobManagerTask)
+        .withJobPreparationTask(createJobPreparationTask())
         .withPoolInfo(poolInfo);
 
     client.jobOperations().createJob(jobAddParameter);
@@ -134,13 +149,19 @@
         .withFilePath(this.azureBatchFileNames.getEvaluatorShimConfigurationPath());
     resources.add(confSourceFile);
 
-    LOG.log(Level.INFO, "Evaluator task command: " + command);
+    LOG.log(Level.INFO, "Evaluator task command: {0}", command);
 
-    final TaskAddParameter taskAddParameter = new TaskAddParameter()
+    TaskAddParameter taskAddParameter = new TaskAddParameter()
         .withId(taskId)
         .withResourceFiles(resources)
+        .withContainerSettings(createTaskContainerSettings(taskId))
         .withCommandLine(command);
 
+    if (this.areContainersEnabled) {
+      taskAddParameter = taskAddParameter.withUserIdentity(
+          new UserIdentity().withAutoUser(new AutoUserSpecification().withElevationLevel(ElevationLevel.ADMIN)));
+    }
+
     this.client.taskOperations().createTask(jobId, taskAddParameter);
   }
 
@@ -169,4 +190,44 @@
   public String getAzureBatchJobId() {
     return System.getenv(AZ_BATCH_JOB_ID_ENV);
   }
+
+  private TaskContainerSettings createTaskContainerSettings(final String dockerContainerId) {
+    if (!this.areContainersEnabled) {
+      return null;
+    }
+
+    final StringBuilder runOptions = new StringBuilder(String.format(
+        "-d --rm --name %s --env %s=%s ",
+        dockerContainerId,
+        ContainerBasedLocalAddressProvider.HOST_IP_ADDR_PATH_ENV,
+        this.commandBuilder.getIpAddressFilePath()));
+
+    for (final int port : this.ports) {
+      runOptions.append(String.format("-p %d:%d ", port, port));
+    }
+
+    return new TaskContainerSettings()
+        .withRegistry(this.containerRegistryProvider.getContainerRegistry())
+        .withImageName(this.containerRegistryProvider.getContainerImageName())
+        .withContainerRunOptions(runOptions.toString());
+  }
+
+  private JobPreparationTask createJobPreparationTask() {
+    if (!this.areContainersEnabled) {
+      return null;
+    }
+
+    final String captureIpAddressCommandLine = this.commandBuilder.captureIpAddressCommandLine();
+    return new JobPreparationTask()
+        .withId(CAPTURE_HOST_IP_ADDRESS_TASK_NAME)
+        .withCommandLine(captureIpAddressCommandLine);
+  }
+
+  public static Set<String> toStringSet(final Collection<?> items) {
+    final Set<String> set = new HashSet<>(items.size());
+    for (final Object elem: items) {
+      set.add(elem.toString());
+    }
+    return Collections.unmodifiableSet(set);
+  }
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/ContainerRegistryProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/ContainerRegistryProvider.java
new file mode 100644
index 0000000..1bfafa9
--- /dev/null
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/batch/ContainerRegistryProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.runtime.azbatch.util.batch;
+
+import com.microsoft.azure.batch.protocol.models.ContainerRegistry;
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.runtime.azbatch.parameters.ContainerImageName;
+import org.apache.reef.runtime.azbatch.parameters.ContainerRegistryPassword;
+import org.apache.reef.runtime.azbatch.parameters.ContainerRegistryServer;
+import org.apache.reef.runtime.azbatch.parameters.ContainerRegistryUsername;
+import org.apache.reef.tang.annotations.Parameter;
+
+import javax.inject.Inject;
+
+/**
+ * Class that holds the settings of the Azure container service settings.
+ */
+public final class ContainerRegistryProvider {
+
+  private final String containerRegistryServer;
+  private final String containerRegistryUsername;
+  private final String containerRegistryPassword;
+  private final String containerImageName;
+
+  @Inject
+  private ContainerRegistryProvider(
+      @Parameter(ContainerRegistryServer.class) final String containerRegistryServer,
+      @Parameter(ContainerRegistryUsername.class) final String containerRegistryUsername,
+      @Parameter(ContainerRegistryPassword.class) final String containerRegistryPassword,
+      @Parameter(ContainerImageName.class) final String containerImageName
+  ) {
+    this.containerRegistryServer = containerRegistryServer;
+    this.containerRegistryUsername = containerRegistryUsername;
+    this.containerRegistryPassword = containerRegistryPassword;
+    this.containerImageName = containerImageName;
+  }
+
+  public boolean isValid() {
+    return !StringUtils.isEmpty(this.containerRegistryServer);
+  }
+
+  public String getContainerRegistryServer() {
+    return this.containerRegistryServer;
+  }
+
+  public String getContainerRegistryUsername() {
+    return this.containerRegistryUsername;
+  }
+
+  public String getContainerRegistryPassword() {
+    return this.containerRegistryPassword;
+  }
+
+  public String getContainerImageName() {
+    return this.containerImageName;
+  }
+
+  public ContainerRegistry getContainerRegistry() {
+    if (!this.isValid()) {
+      throw new RuntimeException("Container registry is invalid");
+    }
+
+    return new ContainerRegistry()
+        .withRegistryServer(this.getContainerRegistryServer())
+        .withUserName(this.getContainerRegistryUsername())
+        .withPassword(this.getContainerRegistryPassword());
+  }
+}
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
index 96cd716..b05e010 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/CommandBuilder.java
@@ -53,4 +53,18 @@
    */
   String buildEvaluatorCommand(final ResourceLaunchEvent resourceLaunchEvent,
                                final int containerMemory, final double jvmHeapFactor);
+
+  /**
+   * Returns the path to a file where the ip address is persisted.
+   *
+   * @return path to the file.
+   */
+  String getIpAddressFilePath();
+
+  /**
+   * Returns a command line that saves the ip address of the host to a file.
+   *
+   * @return command line string.
+   */
+  String captureIpAddressCommandLine();
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
index adb58f5..0865662 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/LinuxCommandBuilder.java
@@ -65,4 +65,15 @@
   protected String getEvaluatorShimClasspath() {
     return StringUtils.join(super.classpathProvider.getEvaluatorClasspath(), CLASSPATH_SEPARATOR_CHAR);
   }
+
+  @Override
+  public String getIpAddressFilePath() {
+    return "$AZ_BATCH_JOB_PREP_WORKING_DIR/hostip.txt";
+  }
+
+  @Override
+  public String captureIpAddressCommandLine() {
+    final String filePath = getIpAddressFilePath();
+    return String.format("/bin/bash -c \"rm -f %s; echo `hostname -i` > %s\"", filePath, filePath);
+  }
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
index f382bef..ea0e097 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/command/WindowsCommandBuilder.java
@@ -71,4 +71,16 @@
     return String.format("'%s'", StringUtils.join(
         super.classpathProvider.getEvaluatorClasspath(), CLASSPATH_SEPARATOR_CHAR));
   }
+
+  @Override
+  public String getIpAddressFilePath() {
+    return "%AZ_BATCH_JOB_PREP_WORKING_DIR%\\hostip.txt";
+  }
+
+  @Override
+  public String captureIpAddressCommandLine() {
+    return String.format("powershell /c \"Set-Content -Path %s -Value "
+        + "((Test-Connection -ComputerName $Env:ComputerName -Count 1).IPV4Address.IPAddressToString) "
+        + " -NoNewline -Force\"", getIpAddressFilePath());
+  }
 }
diff --git a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
index e678445..a087488 100644
--- a/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
+++ b/lang/java/reef-runtime-azbatch/src/main/java/org/apache/reef/runtime/azbatch/util/storage/SharedAccessSignatureCloudBlobClientProvider.java
@@ -100,7 +100,7 @@
       }
 
       URI result = builder.addToURI(uri);
-      LOG.log(Level.INFO, "Here's the URI: " + result);
+      LOG.log(Level.INFO, "Here's the URI: {0}", result);
 
       return result;
     } catch (StorageException | URISyntaxException e) {
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/ContainerBasedLocalAddressProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/ContainerBasedLocalAddressProvider.java
new file mode 100644
index 0000000..2b63935
--- /dev/null
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/address/ContainerBasedLocalAddressProvider.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.reef.wake.remote.address;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.reef.tang.Configuration;
+import org.apache.reef.tang.Tang;
+
+import javax.inject.Inject;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * A LocalAddressProvider that reads the contents of the file at HOST_IP_ADDR_PATH and uses it to be the ip address.
+ */
+public final class ContainerBasedLocalAddressProvider implements LocalAddressProvider {
+
+  private static final Pattern IPADDRESS_PATTERN = Pattern.compile(
+      "(?:(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\.){3}(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)");
+
+  public static final String HOST_IP_ADDR_PATH_ENV = "HOST_IP_ADDR_PATH";
+  private static final Logger LOG = Logger.getLogger(ContainerBasedLocalAddressProvider.class.getName());
+
+  private String cachedLocalAddress = null;
+
+  /**
+   * The constructor is for Tang only.
+   */
+  @Inject
+  private ContainerBasedLocalAddressProvider() {
+    LOG.log(Level.FINE, "Instantiating ContainerBasedLocalAddressProvider");
+  }
+
+  @Override
+  public synchronized String getLocalAddress() {
+    if (cachedLocalAddress != null) {
+      return cachedLocalAddress;
+    }
+
+    final String ipAddressPath = System.getenv(HOST_IP_ADDR_PATH_ENV);
+    LOG.log(Level.FINE, "IpAddressPath is {0}", ipAddressPath);
+    if (StringUtils.isEmpty(ipAddressPath)) {
+      final String message = String.format("Environment variable must be set for %s", HOST_IP_ADDR_PATH_ENV);
+      LOG.log(Level.SEVERE, message);
+      throw new RuntimeException(message);
+    }
+
+    final File ipAddressFile = new File(ipAddressPath);
+    if (!ipAddressFile.exists() || !ipAddressFile.isFile()) {
+      final String message = String.format("%s points to invalid path: %s", HOST_IP_ADDR_PATH_ENV, ipAddressPath);
+      LOG.log(Level.SEVERE, message);
+      throw new RuntimeException(message);
+    }
+
+    try {
+      cachedLocalAddress = readFile(ipAddressPath, StandardCharsets.UTF_8);
+      return cachedLocalAddress;
+    } catch (IOException e) {
+      final String message = String.format("Exception when attempting to read file %s", ipAddressPath);
+      LOG.log(Level.SEVERE, message, e);
+      throw new RuntimeException(message, e);
+    }
+  }
+
+  @Override
+  public Configuration getConfiguration() {
+    return Tang.Factory.getTang().newConfigurationBuilder()
+        .bind(LocalAddressProvider.class, ContainerBasedLocalAddressProvider.class)
+        .build();
+  }
+
+  @Override
+  public String toString() {
+    return "ContainerBasedLocalAddressProvider:" + this.getLocalAddress();
+  }
+
+  private String readFile(final String path, final Charset encoding)
+      throws IOException {
+    final byte[] encoded = Files.readAllBytes(Paths.get(path));
+    final String ipString = new String(encoded, encoding);
+    final Matcher matcher = IPADDRESS_PATTERN.matcher(StringUtils.trim(ipString));
+
+    if (!matcher.matches()) {
+      throw new RuntimeException(String.format("File at location %s has invalid ip address", path));
+    }
+
+    return matcher.group();
+  }
+}
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
index 4566aca..2e4f3d3 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/impl/DefaultRemoteManagerImplementation.java
@@ -27,7 +27,6 @@
 import org.apache.reef.wake.remote.ports.TcpPortProvider;
 import org.apache.reef.wake.remote.transport.Transport;
 import org.apache.reef.wake.remote.transport.TransportFactory;
-import org.apache.reef.wake.remote.transport.netty.NettyMessagingTransport;
 
 import javax.inject.Inject;
 import java.net.InetSocketAddress;
@@ -53,11 +52,6 @@
    */
   private static final long CLOSE_EXECUTOR_TIMEOUT = 10000; //ms
 
-  /**
-   * Indicates a hostname that isn't set or known.
-   */
-  public static final String UNKNOWN_HOST_NAME = NettyMessagingTransport.UNKNOWN_HOST_NAME;
-
   private final AtomicBoolean closed = new AtomicBoolean(false);
   private final RemoteSeqNumGenerator seqGen = new RemoteSeqNumGenerator();
 
@@ -95,7 +89,10 @@
 
     this.handlerContainer.setTransport(this.transport);
 
-    this.myIdentifier = new SocketRemoteIdentifier((InetSocketAddress)this.transport.getLocalAddress());
+    InetSocketAddress address = new InetSocketAddress(
+        localAddressProvider.getLocalAddress(),
+        this.transport.getListeningPort());
+    this.myIdentifier = new SocketRemoteIdentifier(address);
 
     this.reSendStage = new RemoteSenderStage(codec, this.transport, 10);
 
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/SetTcpPortProvider.java
similarity index 69%
rename from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
rename to lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/SetTcpPortProvider.java
index c28a638..a6108af 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/ListTcpPortProvider.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/SetTcpPortProvider.java
@@ -18,28 +18,27 @@
  */
 package org.apache.reef.wake.remote.ports;
 
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.reef.tang.annotations.Parameter;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortList;
+import org.apache.reef.wake.remote.ports.parameters.TcpPortSet;
 
 import javax.inject.Inject;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
 /**
- * A TcpPortProvider which gives out random ports in a range.
+ * A TcpPortProvider which gives out ports in a set.
  */
-public final class ListTcpPortProvider implements TcpPortProvider {
+public final class SetTcpPortProvider implements TcpPortProvider {
 
-  private static final Logger LOG = Logger.getLogger(ListTcpPortProvider.class.getName());
-  private final List<Integer> tcpPortList;
+  private static final Logger LOG = Logger.getLogger(SetTcpPortProvider.class.getName());
+  private final Set<Integer> tcpPortSet;
 
   @Inject
-  public ListTcpPortProvider(@Parameter(TcpPortList.class) final List<Integer> tcpPortList) {
-    this.tcpPortList = tcpPortList;
+  public SetTcpPortProvider(@Parameter(TcpPortSet.class) final Set<Integer> tcpPortSet) {
+    this.tcpPortSet = tcpPortSet;
     LOG.log(Level.FINE, "Instantiating {0}", this);
   }
 
@@ -50,11 +49,11 @@
    */
   @Override
   public Iterator<Integer> iterator() {
-    return this.tcpPortList.iterator();
+    return this.tcpPortSet.iterator();
   }
 
   @Override
   public String toString() {
-    return "ListTcpPortProvider{" + StringUtils.join(this.tcpPortList, ',') + '}';
+    return "SetTcpPortProvider{" + StringUtils.join(this.tcpPortSet, ',') + '}';
   }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortSet.java
similarity index 83%
rename from lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
rename to lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortSet.java
index c7f4fbf..47025e8 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortList.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/ports/parameters/TcpPortSet.java
@@ -20,18 +20,17 @@
 
 import org.apache.reef.tang.annotations.Name;
 import org.apache.reef.tang.annotations.NamedParameter;
-
-import java.util.List;
+import java.util.Set;
 
 /**
- * An list of tcp port numbers to try.
+ * A set of tcp port numbers to try.
  */
-@NamedParameter(doc = "An list of tcp port numbers to try")
-public final class TcpPortList implements Name<List<Integer>> {
+@NamedParameter(doc = "Set of tcp port numbers")
+public final class TcpPortSet implements Name<Set<Integer>> {
 
   /**
    * Empty private constructor to prohibit instantiation of utility class.
    */
-  private TcpPortList() {
+  private TcpPortSet() {
   }
 }
diff --git a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
index ea221ec..b51905e 100644
--- a/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
+++ b/lang/java/reef-wake/wake/src/main/java/org/apache/reef/wake/remote/transport/netty/NettyMessagingTransport.java
@@ -255,9 +255,9 @@
           return link;
         }
       }
-      
+
       if (i == this.numberOfTries) {
-        // Connection failure 
+        // Connection failure
         throw new ConnectException("Connection to " + remoteAddr + " refused");
       }
 
@@ -323,7 +323,7 @@
         }
       }
     }
-    
+
     return link;
   }
 
diff --git a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java
index 67dce8b..a10a4eb 100644
--- a/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java
+++ b/lang/java/reef-webserver/src/main/java/org/apache/reef/webserver/HttpServerImpl.java
@@ -21,16 +21,15 @@
 import org.apache.reef.tang.annotations.Parameter;
 import org.apache.reef.util.logging.LoggingScope;
 import org.apache.reef.util.logging.LoggingScopeFactory;
+import org.apache.reef.wake.remote.RemoteConfiguration;
 import org.apache.reef.wake.remote.address.LocalAddressProvider;
 import org.apache.reef.wake.remote.ports.TcpPortProvider;
-import org.apache.reef.wake.remote.ports.parameters.TcpPortRangeBegin;
 import org.mortbay.jetty.Connector;
 import org.mortbay.jetty.Server;
 import org.mortbay.jetty.bio.SocketConnector;
 
 import javax.inject.Inject;
 import java.net.BindException;
-import java.util.Iterator;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -39,6 +38,11 @@
  */
 public final class HttpServerImpl implements HttpServer {
   /**
+   * Indicates a hostname that isn't set or known.
+   */
+  private static final String UNKNOWN_HOST_NAME = "##UNKNOWN##";
+
+  /**
    * Standard Java logger.
    */
   private static final Logger LOG = Logger.getLogger(HttpServerImpl.class.getName());
@@ -64,35 +68,36 @@
   private final LoggingScopeFactory loggingScopeFactory;
 
   /**
-   * The address provider for the HTTPServer.
+   * The host address for the HTTPServer.
    */
-  private final LocalAddressProvider addressProvider;
+  private final String hostAddress;
 
   /**
    * Constructor of HttpServer that wraps Jetty Server.
    *
    * @param jettyHandler
-   * @param portNumber
    * @throws Exception
    */
   @Inject
-  HttpServerImpl(final JettyHandler jettyHandler,
+  HttpServerImpl(@Parameter(RemoteConfiguration.HostAddress.class) final String hostAddress,
+                 final JettyHandler jettyHandler,
                  final LocalAddressProvider addressProvider,
-                 @Parameter(TcpPortRangeBegin.class)final int portNumber,
                  final TcpPortProvider tcpPortProvider,
                  final LoggingScopeFactory loggingScopeFactory) throws Exception {
-    this.addressProvider = addressProvider;
     this.loggingScopeFactory = loggingScopeFactory;
     this.jettyHandler = jettyHandler;
-    int availablePort = portNumber;
-    Server srv = null;
 
+    this.hostAddress = UNKNOWN_HOST_NAME.equals(hostAddress) ? addressProvider.getLocalAddress() : hostAddress;
     try (final LoggingScope ls = this.loggingScopeFactory.httpServer()) {
 
-      final Iterator<Integer> ports = tcpPortProvider.iterator();
-      while (ports.hasNext() && srv  == null) {
-        availablePort = ports.next();
-        srv = tryPort(availablePort);
+      Server srv = null;
+      int availablePort = -1;
+      for (final int p : tcpPortProvider) {
+        srv = tryPort(p);
+        if (srv != null) {
+          availablePort = p;
+          break;
+        }
       }
 
       if (srv  != null) {
@@ -109,7 +114,7 @@
   private Server tryPort(final int portNumber) throws Exception {
     Server srv = new Server();
     final Connector connector = new SocketConnector();
-    connector.setHost(addressProvider.getLocalAddress());
+    connector.setHost(this.hostAddress);
     connector.setPort(portNumber);
     srv.addConnector(connector);
     try {
@@ -117,7 +122,8 @@
       LOG.log(Level.INFO, "Jetty Server started with port: {0}", portNumber);
     } catch (final BindException ex) {
       srv = null;
-      LOG.log(Level.FINEST, "Cannot use port: {0}. Will try another", portNumber);
+      LOG.log(Level.FINEST, "Cannot use host: {0},port: {1}. Will try another",
+          new Object[] {this.hostAddress, portNumber});
     }
     return srv;
   }