| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.gcp.util; |
| |
| import static org.hamcrest.Matchers.contains; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.greaterThan; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertSame; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.api.client.googleapis.batch.BatchRequest; |
| import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; |
| import com.google.api.client.googleapis.json.GoogleJsonResponseException; |
| import com.google.api.client.http.HttpRequest; |
| import com.google.api.client.http.HttpResponse; |
| import com.google.api.client.http.HttpStatusCodes; |
| import com.google.api.client.http.HttpTransport; |
| import com.google.api.client.http.LowLevelHttpRequest; |
| import com.google.api.client.http.LowLevelHttpResponse; |
| import com.google.api.client.json.GenericJson; |
| import com.google.api.client.json.Json; |
| import com.google.api.client.json.JsonFactory; |
| import com.google.api.client.json.jackson2.JacksonFactory; |
| import com.google.api.client.testing.http.HttpTesting; |
| import com.google.api.client.testing.http.MockHttpTransport; |
| import com.google.api.client.testing.http.MockLowLevelHttpRequest; |
| import com.google.api.client.testing.http.MockLowLevelHttpResponse; |
| import com.google.api.client.util.BackOff; |
| import com.google.api.services.storage.Storage; |
| import com.google.api.services.storage.model.Bucket; |
| import com.google.api.services.storage.model.Objects; |
| import com.google.api.services.storage.model.StorageObject; |
| import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel; |
| import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; |
| import com.google.cloud.hadoop.util.ClientRequestHelper; |
| import java.io.ByteArrayInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.math.BigInteger; |
| import java.net.SocketTimeoutException; |
| import java.nio.channels.SeekableByteChannel; |
| import java.nio.charset.StandardCharsets; |
| import java.nio.file.AccessDeniedException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; |
| import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; |
| import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.RewriteOp; |
| import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException; |
| import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.util.FluentBackoff; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.Mockito; |
| |
| /** Test case for {@link GcsUtil}. */ |
| @RunWith(JUnit4.class) |
| public class GcsUtilTest { |
| @Rule public ExpectedException thrown = ExpectedException.none(); |
| |
| @Test |
| public void testGlobTranslation() { |
| assertEquals("foo", GcsUtil.wildcardToRegexp("foo")); |
| assertEquals("fo[^/]*o", GcsUtil.wildcardToRegexp("fo*o")); |
| assertEquals("f[^/]*o\\.[^/]", GcsUtil.wildcardToRegexp("f*o.?")); |
| assertEquals("foo-[0-9][^/]*", GcsUtil.wildcardToRegexp("foo-[0-9]*")); |
| assertEquals("foo-[0-9].*", GcsUtil.wildcardToRegexp("foo-[0-9]**")); |
| assertEquals(".*foo", GcsUtil.wildcardToRegexp("**/*foo")); |
| assertEquals(".*foo", GcsUtil.wildcardToRegexp("**foo")); |
| assertEquals("foo/[^/]*", GcsUtil.wildcardToRegexp("foo/*")); |
| assertEquals("foo[^/]*", GcsUtil.wildcardToRegexp("foo*")); |
| assertEquals("foo/[^/]*/[^/]*/[^/]*", GcsUtil.wildcardToRegexp("foo/*/*/*")); |
| assertEquals("foo/[^/]*/.*", GcsUtil.wildcardToRegexp("foo/*/**")); |
| assertEquals("foo.*baz", GcsUtil.wildcardToRegexp("foo**baz")); |
| } |
| |
| private static GcsOptions gcsOptionsWithTestCredential() { |
| GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); |
| pipelineOptions.setGcpCredential(new TestCredential()); |
| return pipelineOptions; |
| } |
| |
| @Test |
| public void testCreationWithDefaultOptions() { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| assertNotNull(pipelineOptions.getGcpCredential()); |
| } |
| |
| @Test |
| public void testUploadBufferSizeDefault() { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil util = pipelineOptions.getGcsUtil(); |
| assertNull(util.getUploadBufferSizeBytes()); |
| } |
| |
| @Test |
| public void testUploadBufferSizeUserSpecified() { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| pipelineOptions.setGcsUploadBufferSizeBytes(12345); |
| GcsUtil util = pipelineOptions.getGcsUtil(); |
| assertEquals((Integer) 12345, util.getUploadBufferSizeBytes()); |
| } |
| |
| @Test |
| public void testCreationWithExecutorServiceProvided() { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| pipelineOptions.setExecutorService(Executors.newCachedThreadPool()); |
| assertSame(pipelineOptions.getExecutorService(), pipelineOptions.getGcsUtil().executorService); |
| } |
| |
| @Test |
| public void testCreationWithGcsUtilProvided() { |
| GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); |
| GcsUtil gcsUtil = Mockito.mock(GcsUtil.class); |
| pipelineOptions.setGcsUtil(gcsUtil); |
| assertSame(gcsUtil, pipelineOptions.getGcsUtil()); |
| } |
| |
| @Test |
| public void testMultipleThreadsCanCompleteOutOfOrderWithDefaultThreadPool() throws Exception { |
| GcsOptions pipelineOptions = PipelineOptionsFactory.as(GcsOptions.class); |
| ExecutorService executorService = pipelineOptions.getExecutorService(); |
| |
| int numThreads = 100; |
| final CountDownLatch[] countDownLatches = new CountDownLatch[numThreads]; |
| for (int i = 0; i < numThreads; i++) { |
| final int currentLatch = i; |
| countDownLatches[i] = new CountDownLatch(1); |
| executorService.execute( |
| () -> { |
| // Wait for latch N and then release latch N - 1 |
| try { |
| countDownLatches[currentLatch].await(); |
| if (currentLatch > 0) { |
| countDownLatches[currentLatch - 1].countDown(); |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new RuntimeException(e); |
| } |
| }); |
| } |
| |
| // Release the last latch starting the chain reaction. |
| countDownLatches[countDownLatches.length - 1].countDown(); |
| executorService.shutdown(); |
| assertTrue( |
| "Expected tasks to complete", executorService.awaitTermination(10, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testGlobExpansion() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); |
| Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); |
| Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class); |
| |
| Objects modelObjects = new Objects(); |
| List<StorageObject> items = new ArrayList<>(); |
| // A directory |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); |
| |
| // Files within the directory |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file1name")); |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file2name")); |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/file3name")); |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/anotherfile")); |
| |
| modelObjects.setItems(items); |
| |
| when(mockStorage.objects()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket", "testdirectory/otherfile")) |
| .thenReturn(mockStorageGet); |
| when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList); |
| when(mockStorageGet.execute()) |
| .thenReturn(new StorageObject().setBucket("testbucket").setName("testdirectory/otherfile")); |
| when(mockStorageList.execute()).thenReturn(modelObjects); |
| |
| // Test a single file. |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/otherfile"); |
| List<GcsPath> expectedFiles = |
| ImmutableList.of(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); |
| |
| assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); |
| } |
| |
| // Test patterns. |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); |
| List<GcsPath> expectedFiles = |
| ImmutableList.of( |
| GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); |
| |
| assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); |
| } |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); |
| List<GcsPath> expectedFiles = |
| ImmutableList.of( |
| GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); |
| |
| assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); |
| } |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); |
| List<GcsPath> expectedFiles = |
| ImmutableList.of( |
| GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); |
| |
| assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); |
| } |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); |
| List<GcsPath> expectedFiles = |
| ImmutableList.of( |
| GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file3name")); |
| |
| assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); |
| } |
| } |
| |
| @Test |
| public void testRecursiveGlobExpansion() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); |
| Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); |
| Storage.Objects.List mockStorageList = Mockito.mock(Storage.Objects.List.class); |
| |
| Objects modelObjects = new Objects(); |
| List<StorageObject> items = new ArrayList<>(); |
| // A directory |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); |
| |
| // Files within the directory |
| items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file1.txt")); |
| items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file2.txt")); |
| items.add(new StorageObject().setBucket("testbucket").setName("test/directory/file3.txt")); |
| items.add(new StorageObject().setBucket("testbucket").setName("test/directory/otherfile")); |
| items.add(new StorageObject().setBucket("testbucket").setName("test/directory/anotherfile")); |
| items.add(new StorageObject().setBucket("testbucket").setName("test/file4.txt")); |
| |
| modelObjects.setItems(items); |
| |
| when(mockStorage.objects()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket", "test/directory/otherfile")) |
| .thenReturn(mockStorageGet); |
| when(mockStorageObjects.list("testbucket")).thenReturn(mockStorageList); |
| when(mockStorageGet.execute()) |
| .thenReturn( |
| new StorageObject().setBucket("testbucket").setName("test/directory/otherfile")); |
| when(mockStorageList.execute()).thenReturn(modelObjects); |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/test/**/*.txt"); |
| List<GcsPath> expectedFiles = |
| ImmutableList.of( |
| GcsPath.fromUri("gs://testbucket/test/directory/file1.txt"), |
| GcsPath.fromUri("gs://testbucket/test/directory/file2.txt"), |
| GcsPath.fromUri("gs://testbucket/test/directory/file3.txt"), |
| GcsPath.fromUri("gs://testbucket/test/file4.txt")); |
| |
| assertThat(expectedFiles, contains(gcsUtil.expand(pattern).toArray())); |
| } |
| } |
| |
| // GCSUtil.expand() should fail when matching a single object when that object does not exist. |
| // We should return the empty result since GCS get object is strongly consistent. |
| @Test |
| public void testNonExistentObjectReturnsEmptyResult() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); |
| Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); |
| |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/nonexistentfile"); |
| GoogleJsonResponseException expectedException = |
| googleJsonResponseException( |
| HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see"); |
| |
| when(mockStorage.objects()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())) |
| .thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()).thenThrow(expectedException); |
| |
| assertEquals(Collections.emptyList(), gcsUtil.expand(pattern)); |
| } |
| |
| // GCSUtil.expand() should fail for other errors such as access denied. |
| @Test |
| public void testAccessDeniedObjectThrowsIOException() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); |
| Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); |
| |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/accessdeniedfile"); |
| GoogleJsonResponseException expectedException = |
| googleJsonResponseException( |
| HttpStatusCodes.STATUS_CODE_FORBIDDEN, |
| "Waves hand mysteriously", |
| "These aren't the buckets you're looking for"); |
| |
| when(mockStorage.objects()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get(pattern.getBucket(), pattern.getObject())) |
| .thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()).thenThrow(expectedException); |
| |
| thrown.expect(IOException.class); |
| thrown.expectMessage("Unable to get the file object for path"); |
| gcsUtil.expand(pattern); |
| } |
| |
| @Test |
| public void testFileSizeNonBatch() throws Exception { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); |
| Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); |
| |
| when(mockStorage.objects()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()) |
| .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000))); |
| |
| assertEquals(1000, gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject"))); |
| } |
| |
| @Test |
| public void testFileSizeWhenFileNotFoundNonBatch() throws Exception { |
| MockLowLevelHttpResponse notFoundResponse = new MockLowLevelHttpResponse(); |
| notFoundResponse.setContent(""); |
| notFoundResponse.setStatusCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND); |
| |
| MockHttpTransport mockTransport = |
| new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); |
| |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); |
| |
| thrown.expect(FileNotFoundException.class); |
| gcsUtil.fileSize(GcsPath.fromComponents("testbucket", "testobject")); |
| } |
| |
| @Test |
| public void testRetryFileSizeNonBatch() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); |
| Storage.Objects.Get mockStorageGet = Mockito.mock(Storage.Objects.Get.class); |
| |
| BackOff mockBackOff = |
| BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.withMaxRetries(2).backoff()); |
| |
| when(mockStorage.objects()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket", "testobject")).thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()) |
| .thenThrow(new SocketTimeoutException("SocketException")) |
| .thenThrow(new SocketTimeoutException("SocketException")) |
| .thenReturn(new StorageObject().setSize(BigInteger.valueOf(1000))); |
| |
| assertEquals( |
| 1000, |
| gcsUtil |
| .getObject( |
| GcsPath.fromComponents("testbucket", "testobject"), |
| mockBackOff, |
| new FastNanoClockAndSleeper()) |
| .getSize() |
| .longValue()); |
| assertEquals(BackOff.STOP, mockBackOff.nextBackOffMillis()); |
| } |
| |
| @Test |
| public void testGetSizeBytesWhenFileNotFoundBatch() throws Exception { |
| JsonFactory jsonFactory = new JacksonFactory(); |
| |
| String contentBoundary = "batch_foobarbaz"; |
| String contentBoundaryLine = "--" + contentBoundary; |
| String endOfContentBoundaryLine = "--" + contentBoundary + "--"; |
| |
| GenericJson error = new GenericJson().set("error", new GenericJson().set("code", 404)); |
| error.setFactory(jsonFactory); |
| |
| String content = |
| contentBoundaryLine |
| + "\n" |
| + "Content-Type: application/http\n" |
| + "\n" |
| + "HTTP/1.1 404 Not Found\n" |
| + "Content-Length: -1\n" |
| + "\n" |
| + error.toString() |
| + "\n" |
| + "\n" |
| + endOfContentBoundaryLine |
| + "\n"; |
| thrown.expect(FileNotFoundException.class); |
| MockLowLevelHttpResponse notFoundResponse = |
| new MockLowLevelHttpResponse() |
| .setContentType("multipart/mixed; boundary=" + contentBoundary) |
| .setContent(content) |
| .setStatusCode(HttpStatusCodes.STATUS_CODE_OK); |
| |
| MockHttpTransport mockTransport = |
| new MockHttpTransport.Builder().setLowLevelHttpResponse(notFoundResponse).build(); |
| |
| GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); |
| |
| gcsUtil.setStorageClient(new Storage(mockTransport, Transport.getJsonFactory(), null)); |
| gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); |
| } |
| |
| @Test |
| public void testGetSizeBytesWhenFileNotFoundBatchRetry() throws Exception { |
| JsonFactory jsonFactory = new JacksonFactory(); |
| |
| String contentBoundary = "batch_foobarbaz"; |
| String contentBoundaryLine = "--" + contentBoundary; |
| String endOfContentBoundaryLine = "--" + contentBoundary + "--"; |
| |
| GenericJson error = new GenericJson().set("error", new GenericJson().set("code", 404)); |
| error.setFactory(jsonFactory); |
| |
| String content = |
| contentBoundaryLine |
| + "\n" |
| + "Content-Type: application/http\n" |
| + "\n" |
| + "HTTP/1.1 404 Not Found\n" |
| + "Content-Length: -1\n" |
| + "\n" |
| + error.toString() |
| + "\n" |
| + "\n" |
| + endOfContentBoundaryLine |
| + "\n"; |
| thrown.expect(FileNotFoundException.class); |
| |
| final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); |
| when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); |
| |
| // 429: Too many requests, then 200: OK. |
| when(mockResponse.getStatusCode()).thenReturn(429, 200); |
| when(mockResponse.getContent()).thenReturn(toStream("error"), toStream(content)); |
| |
| // A mock transport that lets us mock the API responses. |
| MockHttpTransport mockTransport = |
| new MockHttpTransport.Builder() |
| .setLowLevelHttpRequest( |
| new MockLowLevelHttpRequest() { |
| @Override |
| public LowLevelHttpResponse execute() throws IOException { |
| return mockResponse; |
| } |
| }) |
| .build(); |
| |
| GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); |
| |
| gcsUtil.setStorageClient( |
| new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); |
| gcsUtil.fileSizes(ImmutableList.of(GcsPath.fromComponents("testbucket", "testobject"))); |
| } |
| |
| @Test |
| public void testRemoveWhenFileNotFound() throws Exception { |
| JsonFactory jsonFactory = new JacksonFactory(); |
| |
| String contentBoundary = "batch_foobarbaz"; |
| String contentBoundaryLine = "--" + contentBoundary; |
| String endOfContentBoundaryLine = "--" + contentBoundary + "--"; |
| |
| GenericJson error = new GenericJson().set("error", new GenericJson().set("code", 404)); |
| error.setFactory(jsonFactory); |
| |
| String content = |
| contentBoundaryLine |
| + "\n" |
| + "Content-Type: application/http\n" |
| + "\n" |
| + "HTTP/1.1 404 Not Found\n" |
| + "Content-Length: -1\n" |
| + "\n" |
| + error.toString() |
| + "\n" |
| + "\n" |
| + endOfContentBoundaryLine |
| + "\n"; |
| |
| final LowLevelHttpResponse mockResponse = Mockito.mock(LowLevelHttpResponse.class); |
| when(mockResponse.getContentType()).thenReturn("multipart/mixed; boundary=" + contentBoundary); |
| when(mockResponse.getStatusCode()).thenReturn(200); |
| when(mockResponse.getContent()).thenReturn(toStream(content)); |
| |
| // A mock transport that lets us mock the API responses. |
| MockLowLevelHttpRequest request = |
| new MockLowLevelHttpRequest() { |
| @Override |
| public LowLevelHttpResponse execute() throws IOException { |
| return mockResponse; |
| } |
| }; |
| MockHttpTransport mockTransport = |
| new MockHttpTransport.Builder().setLowLevelHttpRequest(request).build(); |
| |
| GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); |
| gcsUtil.setStorageClient( |
| new Storage(mockTransport, Transport.getJsonFactory(), new RetryHttpRequestInitializer())); |
| gcsUtil.remove(Arrays.asList("gs://some-bucket/already-deleted")); |
| } |
| |
| @Test |
| public void testCreateBucket() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); |
| |
| BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); |
| |
| when(mockStorage.buckets()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.insert(any(String.class), any(Bucket.class))) |
| .thenReturn(mockStorageInsert); |
| when(mockStorageInsert.execute()) |
| .thenThrow(new SocketTimeoutException("SocketException")) |
| .thenReturn(new Bucket()); |
| |
| gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); |
| } |
| |
| @Test |
| public void testCreateBucketAccessErrors() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); |
| Storage.Buckets.Insert mockStorageInsert = Mockito.mock(Storage.Buckets.Insert.class); |
| |
| BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); |
| GoogleJsonResponseException expectedException = |
| googleJsonResponseException( |
| HttpStatusCodes.STATUS_CODE_FORBIDDEN, |
| "Waves hand mysteriously", |
| "These aren't the buckets you're looking for"); |
| |
| when(mockStorage.buckets()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.insert(any(String.class), any(Bucket.class))) |
| .thenReturn(mockStorageInsert); |
| when(mockStorageInsert.execute()).thenThrow(expectedException); |
| |
| thrown.expect(AccessDeniedException.class); |
| |
| gcsUtil.createBucket("a", new Bucket(), mockBackOff, new FastNanoClockAndSleeper()); |
| } |
| |
| @Test |
| public void testBucketAccessible() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); |
| Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); |
| |
| BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); |
| |
| when(mockStorage.buckets()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()) |
| .thenThrow(new SocketTimeoutException("SocketException")) |
| .thenReturn(new Bucket()); |
| |
| assertTrue( |
| gcsUtil.bucketAccessible( |
| GcsPath.fromComponents("testbucket", "testobject"), |
| mockBackOff, |
| new FastNanoClockAndSleeper())); |
| } |
| |
| @Test |
| public void testBucketDoesNotExistBecauseOfAccessError() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); |
| Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); |
| |
| BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); |
| GoogleJsonResponseException expectedException = |
| googleJsonResponseException( |
| HttpStatusCodes.STATUS_CODE_FORBIDDEN, |
| "Waves hand mysteriously", |
| "These aren't the buckets you're looking for"); |
| |
| when(mockStorage.buckets()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()).thenThrow(expectedException); |
| |
| assertFalse( |
| gcsUtil.bucketAccessible( |
| GcsPath.fromComponents("testbucket", "testobject"), |
| mockBackOff, |
| new FastNanoClockAndSleeper())); |
| } |
| |
| @Test |
| public void testBucketDoesNotExist() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); |
| Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); |
| |
| BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); |
| |
| when(mockStorage.buckets()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()) |
| .thenThrow( |
| googleJsonResponseException( |
| HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see")); |
| |
| assertFalse( |
| gcsUtil.bucketAccessible( |
| GcsPath.fromComponents("testbucket", "testobject"), |
| mockBackOff, |
| new FastNanoClockAndSleeper())); |
| } |
| |
| @Test |
| public void testGetBucket() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); |
| Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); |
| |
| BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); |
| |
| when(mockStorage.buckets()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()) |
| .thenThrow(new SocketTimeoutException("SocketException")) |
| .thenReturn(new Bucket()); |
| |
| assertNotNull( |
| gcsUtil.getBucket( |
| GcsPath.fromComponents("testbucket", "testobject"), |
| mockBackOff, |
| new FastNanoClockAndSleeper())); |
| } |
| |
| @Test |
| public void testGetBucketNotExists() throws IOException { |
| GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); |
| |
| Storage mockStorage = Mockito.mock(Storage.class); |
| gcsUtil.setStorageClient(mockStorage); |
| |
| Storage.Buckets mockStorageObjects = Mockito.mock(Storage.Buckets.class); |
| Storage.Buckets.Get mockStorageGet = Mockito.mock(Storage.Buckets.Get.class); |
| |
| BackOff mockBackOff = BackOffAdapter.toGcpBackOff(FluentBackoff.DEFAULT.backoff()); |
| |
| when(mockStorage.buckets()).thenReturn(mockStorageObjects); |
| when(mockStorageObjects.get("testbucket")).thenReturn(mockStorageGet); |
| when(mockStorageGet.execute()) |
| .thenThrow( |
| googleJsonResponseException( |
| HttpStatusCodes.STATUS_CODE_NOT_FOUND, "It don't exist", "Nothing here to see")); |
| |
| thrown.expect(FileNotFoundException.class); |
| thrown.expectMessage("It don't exist"); |
| gcsUtil.getBucket( |
| GcsPath.fromComponents("testbucket", "testobject"), |
| mockBackOff, |
| new FastNanoClockAndSleeper()); |
| } |
| |
| @Test |
| public void testGCSChannelCloseIdempotent() throws IOException { |
| GoogleCloudStorageReadOptions readOptions = |
| GoogleCloudStorageReadOptions.builder().setFastFailOnNotFound(false).build(); |
| SeekableByteChannel channel = |
| new GoogleCloudStorageReadChannel( |
| null, "dummybucket", "dummyobject", null, new ClientRequestHelper<>(), readOptions); |
| channel.close(); |
| channel.close(); |
| } |
| |
| /** Builds a fake GoogleJsonResponseException for testing API error handling. */ |
| private static GoogleJsonResponseException googleJsonResponseException( |
| final int status, final String reason, final String message) throws IOException { |
| final JsonFactory jsonFactory = new JacksonFactory(); |
| HttpTransport transport = |
| new MockHttpTransport() { |
| @Override |
| public LowLevelHttpRequest buildRequest(String method, String url) throws IOException { |
| ErrorInfo errorInfo = new ErrorInfo(); |
| errorInfo.setReason(reason); |
| errorInfo.setMessage(message); |
| errorInfo.setFactory(jsonFactory); |
| GenericJson error = new GenericJson(); |
| error.set("code", status); |
| error.set("errors", Arrays.asList(errorInfo)); |
| error.setFactory(jsonFactory); |
| GenericJson errorResponse = new GenericJson(); |
| errorResponse.set("error", error); |
| errorResponse.setFactory(jsonFactory); |
| return new MockLowLevelHttpRequest() |
| .setResponse( |
| new MockLowLevelHttpResponse() |
| .setContent(errorResponse.toPrettyString()) |
| .setContentType(Json.MEDIA_TYPE) |
| .setStatusCode(status)); |
| } |
| }; |
| HttpRequest request = |
| transport.createRequestFactory().buildGetRequest(HttpTesting.SIMPLE_GENERIC_URL); |
| request.setThrowExceptionOnExecuteError(false); |
| HttpResponse response = request.execute(); |
| return GoogleJsonResponseException.from(jsonFactory, response); |
| } |
| |
| private static List<String> makeStrings(String s, int n) { |
| ImmutableList.Builder<String> ret = ImmutableList.builder(); |
| for (int i = 0; i < n; ++i) { |
| ret.add(String.format("gs://bucket/%s%d", s, i)); |
| } |
| return ret.build(); |
| } |
| |
| private static List<GcsPath> makeGcsPaths(String s, int n) { |
| ImmutableList.Builder<GcsPath> ret = ImmutableList.builder(); |
| for (int i = 0; i < n; ++i) { |
| ret.add(GcsPath.fromUri(String.format("gs://bucket/%s%d", s, i))); |
| } |
| return ret.build(); |
| } |
| |
| private static int sumBatchSizes(List<BatchRequest> batches) { |
| int ret = 0; |
| for (BatchRequest b : batches) { |
| ret += b.size(); |
| assertThat(b.size(), greaterThan(0)); |
| } |
| return ret; |
| } |
| |
| @Test |
| public void testMakeRewriteOps() throws IOException { |
| GcsOptions gcsOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = gcsOptions.getGcsUtil(); |
| |
| LinkedList<RewriteOp> rewrites = |
| gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1)); |
| assertEquals(1, rewrites.size()); |
| |
| RewriteOp rewrite = rewrites.pop(); |
| assertTrue(rewrite.getReadyToEnqueue()); |
| Storage.Objects.Rewrite request = rewrite.rewriteRequest; |
| assertNull(request.getMaxBytesRewrittenPerCall()); |
| assertEquals("bucket", request.getSourceBucket()); |
| assertEquals("s0", request.getSourceObject()); |
| assertEquals("bucket", request.getDestinationBucket()); |
| assertEquals("d0", request.getDestinationObject()); |
| } |
| |
| @Test |
| public void testMakeRewriteOpsWithOptions() throws IOException { |
| GcsOptions gcsOptions = gcsOptionsWithTestCredential(); |
| GcsUtil gcsUtil = gcsOptions.getGcsUtil(); |
| gcsUtil.maxBytesRewrittenPerCall = 1337L; |
| |
| LinkedList<RewriteOp> rewrites = |
| gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1)); |
| assertEquals(1, rewrites.size()); |
| |
| RewriteOp rewrite = rewrites.pop(); |
| assertTrue(rewrite.getReadyToEnqueue()); |
| Storage.Objects.Rewrite request = rewrite.rewriteRequest; |
| assertEquals(Long.valueOf(1337L), request.getMaxBytesRewrittenPerCall()); |
| } |
| |
| @Test |
| public void testMakeCopyBatches() throws IOException { |
| GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); |
| |
| // Small number of files fits in 1 batch |
| List<BatchRequest> batches = |
| gcsUtil.makeCopyBatches(gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 3))); |
| assertThat(batches.size(), equalTo(1)); |
| assertThat(sumBatchSizes(batches), equalTo(3)); |
| |
| // 1 batch of files fits in 1 batch |
| batches = |
| gcsUtil.makeCopyBatches( |
| gcsUtil.makeRewriteOps(makeStrings("s", 100), makeStrings("d", 100))); |
| assertThat(batches.size(), equalTo(1)); |
| assertThat(sumBatchSizes(batches), equalTo(100)); |
| |
| // A little more than 5 batches of files fits in 6 batches |
| batches = |
| gcsUtil.makeCopyBatches( |
| gcsUtil.makeRewriteOps(makeStrings("s", 501), makeStrings("d", 501))); |
| assertThat(batches.size(), equalTo(6)); |
| assertThat(sumBatchSizes(batches), equalTo(501)); |
| } |
| |
| @Test |
| public void testMakeRewriteOpsInvalid() throws IOException { |
| GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("Number of source files 3"); |
| |
| gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1)); |
| } |
| |
| @Test |
| public void testMakeRemoveBatches() throws IOException { |
| GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); |
| |
| // Small number of files fits in 1 batch |
| List<BatchRequest> batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); |
| assertThat(batches.size(), equalTo(1)); |
| assertThat(sumBatchSizes(batches), equalTo(3)); |
| |
| // 1 batch of files fits in 1 batch |
| batches = gcsUtil.makeRemoveBatches(makeStrings("s", 100)); |
| assertThat(batches.size(), equalTo(1)); |
| assertThat(sumBatchSizes(batches), equalTo(100)); |
| |
| // A little more than 5 batches of files fits in 6 batches |
| batches = gcsUtil.makeRemoveBatches(makeStrings("s", 501)); |
| assertThat(batches.size(), equalTo(6)); |
| assertThat(sumBatchSizes(batches), equalTo(501)); |
| } |
| |
| @Test |
| public void testMakeGetBatches() throws IOException { |
| GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); |
| |
| // Small number of files fits in 1 batch |
| List<StorageObjectOrIOException[]> results = Lists.newArrayList(); |
| List<BatchRequest> batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); |
| assertThat(batches.size(), equalTo(1)); |
| assertThat(sumBatchSizes(batches), equalTo(3)); |
| assertEquals(3, results.size()); |
| |
| // 1 batch of files fits in 1 batch |
| results = Lists.newArrayList(); |
| batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 100), results); |
| assertThat(batches.size(), equalTo(1)); |
| assertThat(sumBatchSizes(batches), equalTo(100)); |
| assertEquals(100, results.size()); |
| |
| // A little more than 5 batches of files fits in 6 batches |
| results = Lists.newArrayList(); |
| batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 501), results); |
| assertThat(batches.size(), equalTo(6)); |
| assertThat(sumBatchSizes(batches), equalTo(501)); |
| assertEquals(501, results.size()); |
| } |
| |
| /** A helper to wrap a {@link GenericJson} object in a content stream. */ |
| private static InputStream toStream(String content) throws IOException { |
| return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); |
| } |
| } |