| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.extensions.gcp.storage; |
| |
| import static org.hamcrest.Matchers.contains; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertThat; |
| import static org.mockito.Matchers.anyString; |
| import static org.mockito.Matchers.eq; |
| import static org.mockito.Matchers.isNull; |
| import static org.mockito.Mockito.when; |
| |
| import com.google.api.services.storage.model.Objects; |
| import com.google.api.services.storage.model.StorageObject; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.math.BigInteger; |
| import java.util.ArrayList; |
| import java.util.List; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; |
| import org.apache.beam.sdk.extensions.gcp.util.GcsUtil; |
| import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException; |
| import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; |
| import org.apache.beam.sdk.io.fs.MatchResult; |
| import org.apache.beam.sdk.io.fs.MatchResult.Status; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.FluentIterable; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.Mock; |
| import org.mockito.MockitoAnnotations; |
| |
| /** Tests for {@link GcsFileSystem}. */ |
| @RunWith(JUnit4.class) |
| public class GcsFileSystemTest { |
| |
| @Rule public transient ExpectedException thrown = ExpectedException.none(); |
| @Mock private GcsUtil mockGcsUtil; |
| private GcsFileSystem gcsFileSystem; |
| |
| @Before |
| public void setUp() { |
| MockitoAnnotations.initMocks(this); |
| GcsOptions gcsOptions = PipelineOptionsFactory.as(GcsOptions.class); |
| gcsOptions.setGcsUtil(mockGcsUtil); |
| gcsFileSystem = new GcsFileSystem(gcsOptions); |
| } |
| |
| @Test |
| public void testMatch() throws Exception { |
| Objects modelObjects = new Objects(); |
| List<StorageObject> items = new ArrayList<>(); |
| // A directory |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); |
| |
| // Files within the directory |
| items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 5L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 6L /* fileSize */)); |
| |
| modelObjects.setItems(items); |
| when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) |
| .thenReturn(modelObjects); |
| |
| List<GcsPath> gcsPaths = |
| ImmutableList.of( |
| GcsPath.fromUri("gs://testbucket/testdirectory/non-exist-file"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); |
| |
| when(mockGcsUtil.getObjects(eq(gcsPaths))) |
| .thenReturn( |
| ImmutableList.of( |
| StorageObjectOrIOException.create(new FileNotFoundException()), |
| StorageObjectOrIOException.create( |
| createStorageObject("gs://testbucket/testdirectory/otherfile", 4L)))); |
| |
| List<String> specs = |
| ImmutableList.of( |
| "gs://testbucket/testdirectory/file[1-3]*", |
| "gs://testbucket/testdirectory/non-exist-file", |
| "gs://testbucket/testdirectory/otherfile"); |
| List<MatchResult> matchResults = gcsFileSystem.match(specs); |
| assertEquals(3, matchResults.size()); |
| assertEquals(Status.OK, matchResults.get(0).status()); |
| assertThat( |
| ImmutableList.of( |
| "gs://testbucket/testdirectory/file1name", |
| "gs://testbucket/testdirectory/file2name", |
| "gs://testbucket/testdirectory/file3name"), |
| contains(toFilenames(matchResults.get(0)).toArray())); |
| assertEquals(Status.NOT_FOUND, matchResults.get(1).status()); |
| assertEquals(Status.OK, matchResults.get(2).status()); |
| assertThat( |
| ImmutableList.of("gs://testbucket/testdirectory/otherfile"), |
| contains(toFilenames(matchResults.get(2)).toArray())); |
| } |
| |
| @Test |
| public void testGlobExpansion() throws IOException { |
| Objects modelObjects = new Objects(); |
| List<StorageObject> items = new ArrayList<>(); |
| // A directory |
| items.add(new StorageObject().setBucket("testbucket").setName("testdirectory/")); |
| |
| // Files within the directory |
| items.add(createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/file2name", 2L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/file3name", 3L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/otherfile", 4L /* fileSize */)); |
| items.add(createStorageObject("gs://testbucket/testdirectory/anotherfile", 5L /* fileSize */)); |
| items.add( |
| createStorageObject("gs://testbucket/testotherdirectory/file4name", 6L /* fileSize */)); |
| |
| modelObjects.setItems(items); |
| |
| when(mockGcsUtil.listObjects(eq("testbucket"), anyString(), isNull(String.class))) |
| .thenReturn(modelObjects); |
| |
| // Test patterns. |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); |
| List<String> expectedFiles = |
| ImmutableList.of( |
| "gs://testbucket/testdirectory/file1name", |
| "gs://testbucket/testdirectory/file2name", |
| "gs://testbucket/testdirectory/file3name"); |
| |
| assertThat(expectedFiles, contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); |
| } |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file*"); |
| List<String> expectedFiles = |
| ImmutableList.of( |
| "gs://testbucket/testdirectory/file1name", |
| "gs://testbucket/testdirectory/file2name", |
| "gs://testbucket/testdirectory/file3name"); |
| |
| assertThat(expectedFiles, contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); |
| } |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file[1-3]*"); |
| List<String> expectedFiles = |
| ImmutableList.of( |
| "gs://testbucket/testdirectory/file1name", |
| "gs://testbucket/testdirectory/file2name", |
| "gs://testbucket/testdirectory/file3name"); |
| |
| assertThat(expectedFiles, contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); |
| } |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/testdirectory/file?name"); |
| List<String> expectedFiles = |
| ImmutableList.of( |
| "gs://testbucket/testdirectory/file1name", |
| "gs://testbucket/testdirectory/file2name", |
| "gs://testbucket/testdirectory/file3name"); |
| |
| assertThat(expectedFiles, contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); |
| } |
| |
| { |
| GcsPath pattern = GcsPath.fromUri("gs://testbucket/test*ectory/fi*name"); |
| List<String> expectedFiles = |
| ImmutableList.of( |
| "gs://testbucket/testdirectory/file1name", |
| "gs://testbucket/testdirectory/file2name", |
| "gs://testbucket/testdirectory/file3name", |
| "gs://testbucket/testotherdirectory/file4name"); |
| |
| assertThat(expectedFiles, contains(toFilenames(gcsFileSystem.expand(pattern)).toArray())); |
| } |
| } |
| |
| @Test |
| public void testExpandNonGlob() throws Exception { |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("Glob expression: [testdirectory/otherfile] is not expandable."); |
| gcsFileSystem.expand(GcsPath.fromUri("gs://testbucket/testdirectory/otherfile")); |
| } |
| |
| @Test |
| public void testMatchNonGlobs() throws Exception { |
| List<StorageObjectOrIOException> items = new ArrayList<>(); |
| // Files within the directory |
| items.add( |
| StorageObjectOrIOException.create( |
| createStorageObject("gs://testbucket/testdirectory/file1name", 1L /* fileSize */))); |
| items.add( |
| StorageObjectOrIOException.create( |
| createStorageObject("gs://testbucket/testdirectory/dir2name/", 0L /* fileSize */))); |
| items.add(StorageObjectOrIOException.create(new FileNotFoundException())); |
| items.add(StorageObjectOrIOException.create(new IOException())); |
| items.add( |
| StorageObjectOrIOException.create( |
| createStorageObject("gs://testbucket/testdirectory/file4name", 4L /* fileSize */))); |
| |
| List<GcsPath> gcsPaths = |
| ImmutableList.of( |
| GcsPath.fromUri("gs://testbucket/testdirectory/file1name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/dir2name/"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file2name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file3name"), |
| GcsPath.fromUri("gs://testbucket/testdirectory/file4name")); |
| |
| when(mockGcsUtil.getObjects(eq(gcsPaths))).thenReturn(items); |
| List<MatchResult> matchResults = gcsFileSystem.matchNonGlobs(gcsPaths); |
| |
| assertEquals(5, matchResults.size()); |
| assertThat( |
| ImmutableList.of("gs://testbucket/testdirectory/file1name"), |
| contains(toFilenames(matchResults.get(0)).toArray())); |
| assertThat( |
| ImmutableList.of("gs://testbucket/testdirectory/dir2name/"), |
| contains(toFilenames(matchResults.get(1)).toArray())); |
| assertEquals(Status.NOT_FOUND, matchResults.get(2).status()); |
| assertEquals(Status.ERROR, matchResults.get(3).status()); |
| assertThat( |
| ImmutableList.of("gs://testbucket/testdirectory/file4name"), |
| contains(toFilenames(matchResults.get(4)).toArray())); |
| } |
| |
| private StorageObject createStorageObject(String gcsFilename, long fileSize) { |
| GcsPath gcsPath = GcsPath.fromUri(gcsFilename); |
| // Google APIs will use null for empty files. |
| @Nullable BigInteger size = (fileSize == 0) ? null : BigInteger.valueOf(fileSize); |
| return new StorageObject() |
| .setBucket(gcsPath.getBucket()) |
| .setName(gcsPath.getObject()) |
| .setSize(size); |
| } |
| |
| private List<String> toFilenames(MatchResult matchResult) throws IOException { |
| return FluentIterable.from(matchResult.metadata()) |
| .transform(metadata -> ((GcsResourceId) metadata.resourceId()).getGcsPath().toString()) |
| .toList(); |
| } |
| } |