[REEF-1998] Create azure blob storage container if it does not exist when calling Azure Blob File System APIs
This addressed the issue by creating a new azure blob container if it
did not exist already for the following api calls for
AzureBlockBlobFileSystem.
* Create
* Copy
* CopyFromLocal
JIRA
[REEF-1998](https://issues.apache.org/jira/browse/REEF-1998)
Pull Request:
This closes #1444
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
index dc48d5e..6623051 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystem.cs
@@ -18,7 +18,6 @@
using System;
using System.IO;
using System.Linq;
-using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using NSubstitute;
@@ -36,13 +35,13 @@
/// </summary>
public sealed class TestAzureBlockBlobFileSystem
{
- private static readonly Uri FakeUri = new Uri("http://fake.com");
+ private static readonly Uri FakeUri = new Uri("http://fake.com/container/file");
[Fact]
public void TestCreate()
{
var testContext = new TestContext();
- Stream stream = testContext.GetAzureFileSystem().Create(new Uri(FakeUri, "container/file"));
+ Stream stream = testContext.GetAzureFileSystem().Create(FakeUri);
testContext.TestCloudBlockBlob.Received(1).Create();
Assert.Equal(testContext.TestCreateStream, stream);
}
@@ -51,7 +50,7 @@
public void TestOpen()
{
var testContext = new TestContext();
- Stream stream = testContext.GetAzureFileSystem().Open(new Uri(FakeUri, "container/file"));
+ Stream stream = testContext.GetAzureFileSystem().Open(FakeUri);
testContext.TestCloudBlockBlob.Received(1).Open();
Assert.Equal(testContext.TestOpenStream, stream);
}
@@ -60,7 +59,7 @@
public void TestDelete()
{
var testContext = new TestContext();
- testContext.GetAzureFileSystem().Delete(new Uri(FakeUri, "container/file"));
+ testContext.GetAzureFileSystem().Delete(FakeUri);
testContext.TestCloudBlockBlob.Received(1).Delete();
}
@@ -99,7 +98,7 @@
[Fact]
public void TestDeleteDirectoryFails()
{
- Assert.Throws<StorageException>(() => new TestContext().GetAzureFileSystem().DeleteDirectory(FakeUri));
+ Assert.Throws<StorageException>(() => new TestContext().GetAzureFileSystem().DeleteDirectory(new Uri(FakeUri.GetLeftPart(UriPartial.Authority))));
}
[Fact]
@@ -128,8 +127,7 @@
public void TestCreateUriForPath()
{
var testContext = new TestContext();
- const string dirStructure = "container/directory";
- Assert.Equal(new Uri(FakeUri, dirStructure), testContext.GetAzureFileSystem().CreateUriForPath(dirStructure));
+ Assert.Equal(FakeUri, testContext.GetAzureFileSystem().CreateUriForPath(FakeUri.LocalPath));
}
private sealed class TestContext
@@ -151,9 +149,10 @@
var injector = TangFactory.GetTang().NewInjector(conf);
injector.BindVolatileInstance(TestCloudBlobClient);
var fs = injector.GetInstance<AzureBlockBlobFileSystem>();
- TestCloudBlobClient.BaseUri.ReturnsForAnyArgs(FakeUri);
+ TestCloudBlobClient.BaseUri.ReturnsForAnyArgs(new Uri(FakeUri.GetLeftPart(UriPartial.Authority)));
TestCloudBlockBlob.Open().Returns(TestOpenStream);
TestCloudBlockBlob.Create().Returns(TestCreateStream);
+ TestCloudBlockBlob.Blob.ReturnsForAnyArgs(new CloudBlockBlob(FakeUri));
TestCloudBlobClient.GetBlockBlobReference(FakeUri).ReturnsForAnyArgs(TestCloudBlockBlob);
TestCloudBlobClient.GetContainerReference("container").ReturnsForAnyArgs(TestCloudBlobContainer);
TestCloudBlobContainer.GetDirectoryReference("directory").ReturnsForAnyArgs(TestCloudBlobDirectory);
diff --git a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
index f5665b6..90a4746 100644
--- a/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
+++ b/lang/cs/Org.Apache.REEF.IO.Tests/TestAzureBlockBlobFileSystemE2E.cs
@@ -37,6 +37,7 @@
private const string SkipMessage = "Fill in credentials before running test"; // Use null to run tests
private const string HelloFile = "hello";
private IFileSystem _fileSystem;
+ private CloudBlobClient _client;
private CloudBlobContainer _container;
public TestAzureBlockBlobFileSystemE2E()
@@ -49,8 +50,9 @@
.Build();
_fileSystem = TangFactory.GetTang().NewInjector(conf).GetInstance<AzureBlockBlobFileSystem>();
- _container = CloudStorageAccount.Parse(ConnectionString).CreateCloudBlobClient().GetContainerReference(defaultContainerName);
- _container.CreateIfNotExistsAsync().Wait();
+ _client = CloudStorageAccount.Parse(ConnectionString).CreateCloudBlobClient();
+ _container = _client.GetContainerReference(defaultContainerName);
+ _container.CreateIfNotExists();
}
public void Dispose()
@@ -106,20 +108,22 @@
[Fact(Skip = SkipMessage)]
public void TestCreateE2E()
{
+ var container = _client.GetContainerReference("create-reef-test-container-" + Guid.NewGuid());
const string Text = "Hello Azure Blob";
- var blob = _container.GetBlockBlobReference(HelloFile);
+ var blob = container.GetBlockBlobReference(HelloFile);
Assert.False(CheckBlobExists(blob));
- using (var streamWriter = new StreamWriter(_fileSystem.Create(PathToFile(HelloFile))))
+ using (var streamWriter = new StreamWriter(_fileSystem.Create(PathToFile(HelloFile, container.Name))))
{
streamWriter.Write(Text);
}
- blob = _container.GetBlockBlobReference(HelloFile);
+ blob = container.GetBlockBlobReference(HelloFile);
Assert.True(CheckBlobExists(blob));
using (var reader = new StreamReader(blob.OpenRead()))
{
string streamText = reader.ReadToEnd();
Assert.Equal(Text, streamText);
}
+ container.DeleteIfExistsAsync().Wait();
}
[Fact(Skip = SkipMessage)]
@@ -148,19 +152,21 @@
{
const string SrcFileName = "src";
const string DestFileName = "dest";
+ var destContainer = _client.GetContainerReference("dest-reef-test-container-" + Guid.NewGuid());
var srcFilePath = PathToFile(SrcFileName);
- var destFilePath = PathToFile(DestFileName);
+ var destFilePath = PathToFile(DestFileName, destContainer.Name);
ICloudBlob srcBlob = _container.GetBlockBlobReference(SrcFileName);
UploadFromString(srcBlob, "hello");
Assert.True(CheckBlobExists(srcBlob));
- ICloudBlob destBlob = _container.GetBlockBlobReference(DestFileName);
+ ICloudBlob destBlob = destContainer.GetBlockBlobReference(DestFileName);
Assert.False(CheckBlobExists(destBlob));
_fileSystem.Copy(srcFilePath, destFilePath);
- destBlob = GetBlobReferenceFromServer(_container, DestFileName);
+ destBlob = GetBlobReferenceFromServer(destContainer, DestFileName);
Assert.True(CheckBlobExists(destBlob));
srcBlob = GetBlobReferenceFromServer(_container, SrcFileName);
Assert.True(CheckBlobExists(srcBlob));
- Assert.Equal(DownloadText(_container.GetBlockBlobReference(SrcFileName)), DownloadText(_container.GetBlockBlobReference(DestFileName)));
+ Assert.Equal(DownloadText(_container.GetBlockBlobReference(SrcFileName)), DownloadText(destContainer.GetBlockBlobReference(DestFileName)));
+ destContainer.DeleteIfExistsAsync().Wait();
}
[Fact(Skip = SkipMessage)]
@@ -169,6 +175,7 @@
var helloFilePath = PathToFile(HelloFile);
var blob = _container.GetBlockBlobReference(HelloFile);
var tempFilePath = Path.GetTempFileName();
+ File.Delete(tempFilePath); // Delete the file as CopyToLocal will create it
const string Text = "hello";
try
@@ -187,8 +194,9 @@
[Fact(Skip = SkipMessage)]
public void TestCopyFromLocalE2E()
{
- var helloFilePath = PathToFile(HelloFile);
- ICloudBlob blob = _container.GetBlockBlobReference(HelloFile);
+ var container = _client.GetContainerReference("copy-reef-test-container-" + Guid.NewGuid());
+ var helloFilePath = PathToFile(HelloFile, container.Name);
+ ICloudBlob blob = container.GetBlockBlobReference(HelloFile);
Assert.False(CheckBlobExists(blob));
var tempFilePath = Path.GetTempFileName();
const string Text = "hello";
@@ -196,7 +204,7 @@
{
File.WriteAllText(tempFilePath, Text);
_fileSystem.CopyFromLocal(tempFilePath, helloFilePath);
- blob = GetBlobReferenceFromServer(_container, HelloFile);
+ blob = GetBlobReferenceFromServer(container, HelloFile);
Assert.True(CheckBlobExists(blob));
using (var stream = new MemoryStream())
{
@@ -214,6 +222,7 @@
{
File.Delete(tempFilePath);
}
+ container.DeleteIfExistsAsync().Wait();
}
[Fact(Skip = SkipMessage)]
@@ -227,8 +236,7 @@
public void TestDeleteDirectoryFirstLevelE2E()
{
const string Directory = "dir";
- var blockBlobs = new List<CloudBlockBlob>();
-
+ var blockBlobs = new List<CloudBlockBlob>();
for (var i = 0; i < 3; i++)
{
var filePath = Directory + '/' + i;
@@ -255,7 +263,6 @@
const string Directory2 = "dir2";
var blockBlobs1 = new List<CloudBlockBlob>();
var blockBlobs2 = new List<CloudBlockBlob>();
-
for (var i = 0; i < 3; i++)
{
var filePath1 = Directory1 + '/' + i;
@@ -291,9 +298,10 @@
blob.UploadFromByteArrayAsync(byteArray, 0, byteArray.Length).Wait();
}
- private Uri PathToFile(string filePath)
+ private Uri PathToFile(string filePath, string containerName = null)
{
- return _fileSystem.CreateUriForPath(_container.Name + '/' + filePath);
+ containerName = containerName ?? _container.Name;
+ return _fileSystem.CreateUriForPath(containerName + '/' + filePath);
}
}
}
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
index 81a7c1f..d333839 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureBlockBlobFileSystem.cs
@@ -54,7 +54,9 @@
/// </summary>
public Stream Create(Uri fileUri)
{
- return _client.GetBlockBlobReference(fileUri).Create();
+ var blockBlob = _client.GetBlockBlobReference(fileUri);
+ _client.GetContainerReference(blockBlob.Blob.Container.Name).CreateIfNotExists();
+ return blockBlob.Create();
}
/// <summary>
@@ -81,8 +83,9 @@
/// </summary>
public void Copy(Uri sourceUri, Uri destinationUri)
{
- _client.GetBlockBlobReference(destinationUri).StartCopy(sourceUri);
var blockBlob = _client.GetBlockBlobReference(destinationUri);
+ _client.GetContainerReference(blockBlob.Blob.Container.Name).CreateIfNotExists();
+ blockBlob.StartCopy(sourceUri);
blockBlob.FetchAttributes();
while (blockBlob.CopyState.Status == CopyStatus.Pending)
@@ -107,7 +110,9 @@
/// </summary>
public void CopyFromLocal(string localFileName, Uri remoteFileUri)
{
- _client.GetBlockBlobReference(remoteFileUri).UploadFromFile(localFileName, FileMode.Open);
+ var blockBlob = _client.GetBlockBlobReference(remoteFileUri);
+ _client.GetContainerReference(blockBlob.Blob.Container.Name).CreateIfNotExists();
+ blockBlob.UploadFromFile(localFileName, FileMode.Open);
}
/// <summary>
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs
index 03b9123..c71aa7e 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/AzureCloudBlobContainer.cs
@@ -31,6 +31,11 @@
_container = container;
}
+ public bool CreateIfNotExists()
+ {
+ return _container.CreateIfNotExists();
+ }
+
public void DeleteIfExists()
{
_container.DeleteIfExistsAsync().Wait();
diff --git a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobContainer.cs b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobContainer.cs
index ce3cf2c..f5c124c 100644
--- a/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobContainer.cs
+++ b/lang/cs/Org.Apache.REEF.IO/FileSystem/AzureBlob/ICloudBlobContainer.cs
@@ -23,6 +23,12 @@
internal interface ICloudBlobContainer
{
/// <summary>
+ /// Creates the <see cref="ICloudBlobContainer"/> if it does not exist already.
+ /// </summary>
+ /// <returns>Whether a new container was created or not</returns>
+ bool CreateIfNotExists();
+
+ /// <summary>
/// Deletes the <see cref="ICloudBlobContainer"/> if it exists.
/// Does a round trip to the Blob Server.
/// </summary>