blob: 759296ef260e567ec420528f91d7705ee2e9c0e1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.gcp.util;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.when;
import com.google.api.client.googleapis.batch.BatchRequest;
import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpStatusCodes;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.LowLevelHttpRequest;
import com.google.api.client.http.LowLevelHttpResponse;
import com.google.api.client.json.GenericJson;
import com.google.api.client.json.Json;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.client.testing.http.HttpTesting;
import com.google.api.client.testing.http.MockHttpTransport;
import com.google.api.client.testing.http.MockLowLevelHttpRequest;
import com.google.api.client.testing.http.MockLowLevelHttpResponse;
import com.google.api.client.util.BackOff;
import com.google.api.services.storage.Storage;
import com.google.api.services.storage.model.Bucket;
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions;
import com.google.cloud.hadoop.util.ClientRequestHelper;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigInteger;
import java.net.SocketTimeoutException;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.AccessDeniedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.RewriteOp;
import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mockito;
/** Test case for {@link GcsUtil}. */
@RunWith(JUnit4.class)
public class GcsUtilTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testGlobTranslation() {
assertEquals("foo", GcsUtil.wildcardToRegexp("foo"));
assertEquals("fo[^/]*o", GcsUtil.wildcardToRegexp("fo*o"));
assertEquals("f[^/]*o\\.[^/]", GcsUtil.wildcardToRegexp("f*o.?"));
assertEquals("foo-[0-9][^/]*", GcsUtil.wildcardToRegexp("foo-[0-9]*"));
assertEquals("foo-[0-9].*", GcsUtil.wildcardToRegexp("foo-[0-9]**"));
assertEquals(".*foo", GcsUtil.wildcardToRegexp("**/*foo"));
assertEquals(".*foo", GcsUtil.wildcardToRegexp("**foo"));
assertEquals("foo/[^/]*", GcsUtil.wildcardToRegexp("foo/*"));
assertEquals("foo[^/]*", GcsUtil.wildcardToRegexp("foo*"));
assertEquals("foo/[^/]*/[^/]*/[^/]*", GcsUtil.wildcardToRegexp("foo/*/*/*"));
assertEquals("foo/[^/]*/.*", GcsUtil.wildcardToRegexp("foo/*/**"));
assertEquals("foo.*baz", GcsUtil.wildcardToRegexp("foo**baz"));
}
private static GcsOptions gcsOptionsWithTestCredential() {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
pipelineOptions.setGcpCredential(new TestCredential());
return pipelineOptions;
}
@Test
public void testCreationWithDefaultOptions() {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
assertNotNull(pipelineOptions.getGcpCredential());
}
@Test
public void testUploadBufferSizeDefault() {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil util = pipelineOptions.getGcsUtil();
assertNull(util.getUploadBufferSizeBytes());
}
@Test
public void testUploadBufferSizeUserSpecified() {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
pipelineOptions.setGcsUploadBufferSizeBytes(12345);
GcsUtil util = pipelineOptions.getGcsUtil();
assertEquals((Integer) 12345, util.getUploadBufferSizeBytes());
}
@Test
public void testCreationWithExecutorServiceProvided() {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
pipelineOptions.setExecutorService(Executors.newCachedThreadPool());
assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService);
}
@Test
public void testCreationWithGcsUtilProvided() {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
GcsUtil gcsUtil = Mockito.mock(GcsUtil.class);
pipelineOptions.setGcsUtil(gcsUtil);
assertSame(gcsUtil, pipelineOptions.getGcsUtil());
}
@Test
public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception {
GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class);
ExecutorService executorService = pipelineOptions.getExecutorService();
int numThreads = 100;
final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads];
for (int i = 0; i < numThreads; i++) {
final int currentLatch = i;
countDownLatches[i] = new CountDownLatch(1);
executorService.execute(
() -> {
// Wait for latch N and then release latch N - 1
try {
countDownLatches[currentLatch].await();
if (currentLatch > 0) {
countDownLatches[currentLatch - 1].countDown();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
// Release the last latch starting the chain reaction.
countDownLatches[countDownLatches.length - 1].countDown();
executorService.shutdown();
assertTrue(
"Expected tasks to complete", executorService.awaitTermination(10, TimeUnit.SECONDS));
}
@Test
public void testGlobExpansion() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
Objects modelObjects = new Objects();
List<StorageObject> items = new ArrayList<>();
// A directory
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
// Files within the directory
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name"));
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name"));
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name"));
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile"));
modelObjects.setItems(items);
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket", "testdirectory/otherfile"))
.thenReturn(mockStorageGet);
when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
when(mockStorageGet.execute())
.thenReturn(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile"));
when(mockStorageList.execute()).thenReturn(modelObjects);
// Test a single file.
{
GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile");
List<GcsPath> expectedFiles =
ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"));
assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
}
// Test patterns.
{
GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*");
List<GcsPath> expectedFiles =
ImmutableList.of(
GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
}
{
GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*");
List<GcsPath> expectedFiles =
ImmutableList.of(
GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
}
{
GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name");
List<GcsPath> expectedFiles =
ImmutableList.of(
GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
}
{
GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name");
List<GcsPath> expectedFiles =
ImmutableList.of(
GcsPath.fromUri("gs://testbucket/testdirectory/file1name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file2name"),
GcsPath.fromUri("gs://testbucket/testdirectory/file3name"));
assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
}
}
@Test
public void testRecursiveGlobExpansion() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class);
Objects modelObjects = new Objects();
List<StorageObject> items = new ArrayList<>();
// A directory
items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/"));
// Files within the directory
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file1.txt"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file2.txt"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file3.txt"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/otherfile"));
items.add(new StorageObject().setBucket("testbucket").setName("test/directory/anotherfile"));
items.add(new StorageObject().setBucket("testbucket").setName("test/file4.txt"));
modelObjects.setItems(items);
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket", "test/directory/otherfile"))
.thenReturn(mockStorageGet);
when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList);
when(mockStorageGet.execute())
.thenReturn(
new StorageObject().setBucket("testbucket").setName("test/directory/otherfile"));
when(mockStorageList.execute()).thenReturn(modelObjects);
{
GcsPath pattern = GcsPath.fromUri("gs://testbucket/test/**/*.txt");
List<GcsPath> expectedFiles =
ImmutableList.of(
GcsPath.fromUri("gs://testbucket/test/directory/file1.txt"),
GcsPath.fromUri("gs://testbucket/test/directory/file2.txt"),
GcsPath.fromUri("gs://testbucket/test/directory/file3.txt"),
GcsPath.fromUri("gs://testbucket/test/file4.txt"));
assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray()));
}
}
// GCSUtil.expand() should fail when matching a single object when that object does not exist.
// We should return the empty result since GCS get object is strongly consistent.
@Test
public void testNonExistentObjectReturnsEmptyResult() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile");
GoogleJsonResponseException expectedException =
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see");
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject()))
.thenReturn(mockStorageGet);
when(mockStorageGet.execute()).thenThrow(expectedException);
assertEquals(Collections.emptyList(), gcsUtil.expand(pattern));
}
// GCSUtil.expand() should fail for other errors such as access denied.
@Test
public void testAccessDeniedObjectThrowsIOException() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile");
GoogleJsonResponseException expectedException =
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_FORBIDDEN,
"Waves hand mysteriously",
"These aren't the buckets you're looking for");
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject()))
.thenReturn(mockStorageGet);
when(mockStorageGet.execute()).thenThrow(expectedException);
thrown.expect(IOException.class);
thrown.expectMessage("Unable to get the file object for path");
gcsUtil.expand(pattern);
}
@Test
public void testFileSizeNonBatch() throws Exception {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
when(mockStorageGet.execute())
.thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")));
}
@Test
public void testFileSizeWhenFileNotFoundNonBatch() throws Exception {
MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse();
notFoundResponse.setContent("");
notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND);
MockHttpTransport mockTransport =
new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
thrown.expect(FileNotFoundException.class);
gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"));
}
@Test
public void testRetryFileSizeNonBatch() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class);
Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class);
BackOff mockBackOff =
BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.withMaxRetries(2).backoff());
when(mockStorage.objects()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet);
when(mockStorageGet.execute())
.thenThrow(new SocketTimeoutException("SocketException"))
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000)));
assertEquals(
1000,
gcsUtil
.getObject(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper())
.getSize()
.longValue());
assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis());
}
@Test
public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception {
JsonFactory jsonFactory = new JacksonFactory();
String contentBoundary = "batch_foobarbaz";
String contentBoundaryLine = "--" + contentBoundary;
String endOfContentBoundaryLine = "--" + contentBoundary + "--";
GenericJson error = new GenericJson().set("error", new GenericJson().set("code", 404));
error.setFactory(jsonFactory);
String content =
contentBoundaryLine
+ "\n"
+ "Content-Type: application/http\n"
+ "\n"
+ "HTTP/1.1 404 Not Found\n"
+ "Content-Length: -1\n"
+ "\n"
+ error.toString()
+ "\n"
+ "\n"
+ endOfContentBoundaryLine
+ "\n";
thrown.expect(FileNotFoundException.class);
MockLowLevelHttpResponse notFoundResponse =
new MockLowLevelHttpResponse()
.setContentType("multipart/mixed; boundary=" + contentBoundary)
.setContent(content)
.setStatusCode(HttpStatusCodes.STATUS_CODE_OK);
MockHttpTransport mockTransport =
new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build();
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null));
gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
}
@Test
public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception {
JsonFactory jsonFactory = new JacksonFactory();
String contentBoundary = "batch_foobarbaz";
String contentBoundaryLine = "--" + contentBoundary;
String endOfContentBoundaryLine = "--" + contentBoundary + "--";
GenericJson error = new GenericJson().set("error", new GenericJson().set("code", 404));
error.setFactory(jsonFactory);
String content =
contentBoundaryLine
+ "\n"
+ "Content-Type: application/http\n"
+ "\n"
+ "HTTP/1.1 404 Not Found\n"
+ "Content-Length: -1\n"
+ "\n"
+ error.toString()
+ "\n"
+ "\n"
+ endOfContentBoundaryLine
+ "\n";
thrown.expect(FileNotFoundException.class);
final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class);
when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary);
// 429: Too many requests, then 200: OK.
when(mockResponse.getStatusCode()).thenReturn(429, 200);
when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content));
// A mock transport that lets us mock the API responses.
MockHttpTransport mockTransport =
new MockHttpTransport.Builder()
.setLowLevelHttpRequest(
new MockLowLevelHttpRequest() {
@Override
public LowLevelHttpResponse execute() throws IOException {
return mockResponse;
}
})
.build();
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
gcsUtil.setStorageClient(
new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject")));
}
@Test
public void testRemoveWhenFileNotFound() throws Exception {
JsonFactory jsonFactory = new JacksonFactory();
String contentBoundary = "batch_foobarbaz";
String contentBoundaryLine = "--" + contentBoundary;
String endOfContentBoundaryLine = "--" + contentBoundary + "--";
GenericJson error = new GenericJson().set("error", new GenericJson().set("code", 404));
error.setFactory(jsonFactory);
String content =
contentBoundaryLine
+ "\n"
+ "Content-Type: application/http\n"
+ "\n"
+ "HTTP/1.1 404 Not Found\n"
+ "Content-Length: -1\n"
+ "\n"
+ error.toString()
+ "\n"
+ "\n"
+ endOfContentBoundaryLine
+ "\n";
final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class);
when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary);
when(mockResponse.getStatusCode()).thenReturn(200);
when(mockResponse.getContent()).thenReturn(toStream(content));
// A mock transport that lets us mock the API responses.
MockLowLevelHttpRequest request =
new MockLowLevelHttpRequest() {
@Override
public LowLevelHttpResponse execute() throws IOException {
return mockResponse;
}
};
MockHttpTransport mockTransport =
new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build();
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
gcsUtil.setStorageClient(
new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer()));
gcsUtil.remove(Arrays.asList("gs://some-bucket/already-deleted"));
}
@Test
public void testCreateBucket() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.insert(any(String.class), any(Bucket.class)))
.thenReturn(mockStorageInsert);
when(mockStorageInsert.execute())
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new Bucket());
gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
}
@Test
public void testCreateBucketAccessErrors() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class);
BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
GoogleJsonResponseException expectedException =
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_FORBIDDEN,
"Waves hand mysteriously",
"These aren't the buckets you're looking for");
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.insert(any(String.class), any(Bucket.class)))
.thenReturn(mockStorageInsert);
when(mockStorageInsert.execute()).thenThrow(expectedException);
thrown.expect(AccessDeniedException.class);
gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper());
}
@Test
public void testBucketAccessible() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
when(mockStorageGet.execute())
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new Bucket());
assertTrue(
gcsUtil.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()));
}
@Test
public void testBucketDoesNotExistBecauseOfAccessError() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
GoogleJsonResponseException expectedException =
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_FORBIDDEN,
"Waves hand mysteriously",
"These aren't the buckets you're looking for");
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
when(mockStorageGet.execute()).thenThrow(expectedException);
assertFalse(
gcsUtil.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()));
}
@Test
public void testBucketDoesNotExist() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
when(mockStorageGet.execute())
.thenThrow(
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see"));
assertFalse(
gcsUtil.bucketAccessible(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()));
}
@Test
public void testGetBucket() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
when(mockStorageGet.execute())
.thenThrow(new SocketTimeoutException("SocketException"))
.thenReturn(new Bucket());
assertNotNull(
gcsUtil.getBucket(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper()));
}
@Test
public void testGetBucketNotExists() throws IOException {
GcsOptions pipelineOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = pipelineOptions.getGcsUtil();
Storage mockStorage = Mockito.mock(Storage.class);
gcsUtil.setStorageClient(mockStorage);
Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class);
Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class);
BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff());
when(mockStorage.buckets()).thenReturn(mockStorageObjects);
when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet);
when(mockStorageGet.execute())
.thenThrow(
googleJsonResponseException(
HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see"));
thrown.expect(FileNotFoundException.class);
thrown.expectMessage("It don't exist");
gcsUtil.getBucket(
GcsPath.fromComponents("testbucket", "testobject"),
mockBackOff,
new FastNanoClockAndSleeper());
}
@Test
public void testGCSChannelCloseIdempotent() throws IOException {
GoogleCloudStorageReadOptions readOptions =
GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build();
SeekableByteChannel channel =
new GoogleCloudStorageReadChannel(
null, "dummybucket", "dummyobject", null, new ClientRequestHelper<>(), readOptions);
channel.close();
channel.close();
}
/** Builds a fake GoogleJsonResponseException for testing API error handling. */
private static GoogleJsonResponseException googleJsonResponseException(
final int status, final String reason, final String message) throws IOException {
final JsonFactory jsonFactory = new JacksonFactory();
HttpTransport transport =
new MockHttpTransport() {
@Override
public LowLevelHttpRequest buildRequest(String method, String url) throws IOException {
ErrorInfo errorInfo = new ErrorInfo();
errorInfo.setReason(reason);
errorInfo.setMessage(message);
errorInfo.setFactory(jsonFactory);
GenericJson error = new GenericJson();
error.set("code", status);
error.set("errors", Arrays.asList(errorInfo));
error.setFactory(jsonFactory);
GenericJson errorResponse = new GenericJson();
errorResponse.set("error", error);
errorResponse.setFactory(jsonFactory);
return new MockLowLevelHttpRequest()
.setResponse(
new MockLowLevelHttpResponse()
.setContent(errorResponse.toPrettyString())
.setContentType(Json.MEDIA_TYPE)
.setStatusCode(status));
}
};
HttpRequest request =
transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL);
request.setThrowExceptionOnExecuteError(false);
HttpResponse response = request.execute();
return GoogleJsonResponseException.from(jsonFactory, response);
}
private static List<String> makeStrings(String s, int n) {
ImmutableList.Builder<String> ret = ImmutableList.builder();
for (int i = 0; i < n; ++i) {
ret.add(String.format("gs://bucket/%s%d", s, i));
}
return ret.build();
}
private static List<GcsPath> makeGcsPaths(String s, int n) {
ImmutableList.Builder<GcsPath> ret = ImmutableList.builder();
for (int i = 0; i < n; ++i) {
ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i)));
}
return ret.build();
}
private static int sumBatchSizes(List<BatchRequest> batches) {
int ret = 0;
for (BatchRequest b : batches) {
ret += b.size();
assertThat(b.size(), greaterThan(0));
}
return ret;
}
@Test
public void testMakeRewriteOps() throws IOException {
GcsOptions gcsOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = gcsOptions.getGcsUtil();
LinkedList<RewriteOp> rewrites =
gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1));
assertEquals(1, rewrites.size());
RewriteOp rewrite = rewrites.pop();
assertTrue(rewrite.getReadyToEnqueue());
Storage.Objects.Rewrite request = rewrite.rewriteRequest;
assertNull(request.getMaxBytesRewrittenPerCall());
assertEquals("bucket", request.getSourceBucket());
assertEquals("s0", request.getSourceObject());
assertEquals("bucket", request.getDestinationBucket());
assertEquals("d0", request.getDestinationObject());
}
@Test
public void testMakeRewriteOpsWithOptions() throws IOException {
GcsOptions gcsOptions = gcsOptionsWithTestCredential();
GcsUtil gcsUtil = gcsOptions.getGcsUtil();
gcsUtil.maxBytesRewrittenPerCall = 1337L;
LinkedList<RewriteOp> rewrites =
gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1));
assertEquals(1, rewrites.size());
RewriteOp rewrite = rewrites.pop();
assertTrue(rewrite.getReadyToEnqueue());
Storage.Objects.Rewrite request = rewrite.rewriteRequest;
assertEquals(Long.valueOf(1337L), request.getMaxBytesRewrittenPerCall());
}
@Test
public void testMakeCopyBatches() throws IOException {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
// Small number of files fits in 1 batch
List<BatchRequest> batches =
gcsUtil.makeCopyBatches(gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 3)));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(3));
// 1 batch of files fits in 1 batch
batches =
gcsUtil.makeCopyBatches(
gcsUtil.makeRewriteOps(makeStrings("s", 100), makeStrings("d", 100)));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(100));
// A little more than 5 batches of files fits in 6 batches
batches =
gcsUtil.makeCopyBatches(
gcsUtil.makeRewriteOps(makeStrings("s", 501), makeStrings("d", 501)));
assertThat(batches.size(), equalTo(6));
assertThat(sumBatchSizes(batches), equalTo(501));
}
@Test
public void testMakeRewriteOpsInvalid() throws IOException {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("Number of source files 3");
gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1));
}
@Test
public void testMakeRemoveBatches() throws IOException {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
// Small number of files fits in 1 batch
List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(3));
// 1 batch of files fits in 1 batch
batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100));
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(100));
// A little more than 5 batches of files fits in 6 batches
batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501));
assertThat(batches.size(), equalTo(6));
assertThat(sumBatchSizes(batches), equalTo(501));
}
@Test
public void testMakeGetBatches() throws IOException {
GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil();
// Small number of files fits in 1 batch
List<StorageObjectOrIOException[]> results = Lists.newArrayList();
List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results);
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(3));
assertEquals(3, results.size());
// 1 batch of files fits in 1 batch
results = Lists.newArrayList();
batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results);
assertThat(batches.size(), equalTo(1));
assertThat(sumBatchSizes(batches), equalTo(100));
assertEquals(100, results.size());
// A little more than 5 batches of files fits in 6 batches
results = Lists.newArrayList();
batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results);
assertThat(batches.size(), equalTo(6));
assertThat(sumBatchSizes(batches), equalTo(501));
assertEquals(501, results.size());
}
/** A helper to wrap a {@link GenericJson} object in a content stream. */
private static InputStream toStream(String content) throws IOException {
return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
}
}